diff --git a/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkErrorIT.kt b/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkErrorIT.kt new file mode 100644 index 00000000..b496b7b4 --- /dev/null +++ b/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkErrorIT.kt @@ -0,0 +1,920 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.sink + +import io.kotest.assertions.nondeterministic.eventually +import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder +import io.kotest.matchers.should +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import io.kotest.matchers.string.shouldContain +import java.time.Duration +import java.time.ZonedDateTime +import kotlin.time.Duration.Companion.seconds +import org.apache.kafka.common.header.Headers +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.storage.SimpleHeaderConverter +import org.junit.jupiter.api.Test +import org.neo4j.cdc.client.model.CaptureMode +import org.neo4j.cdc.client.model.ChangeEvent +import org.neo4j.cdc.client.model.ChangeIdentifier +import org.neo4j.cdc.client.model.EntityOperation +import org.neo4j.cdc.client.model.Event +import org.neo4j.cdc.client.model.Metadata +import org.neo4j.cdc.client.model.NodeEvent +import org.neo4j.cdc.client.model.NodeState +import org.neo4j.connectors.kafka.testing.TestSupport.runTest +import org.neo4j.connectors.kafka.testing.assertions.TopicVerifier +import org.neo4j.connectors.kafka.testing.format.KafkaConverter.AVRO +import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_SCHEMA +import org.neo4j.connectors.kafka.testing.format.KafkaConverter.PROTOBUF +import org.neo4j.connectors.kafka.testing.format.KeyValueConverter +import org.neo4j.connectors.kafka.testing.kafka.ConvertingKafkaConsumer +import org.neo4j.connectors.kafka.testing.kafka.ConvertingKafkaProducer +import org.neo4j.connectors.kafka.testing.kafka.KafkaMessage +import org.neo4j.connectors.kafka.testing.sink.CdcSchemaStrategy +import org.neo4j.connectors.kafka.testing.sink.CdcSourceIdStrategy +import org.neo4j.connectors.kafka.testing.sink.CudStrategy +import org.neo4j.connectors.kafka.testing.sink.CypherStrategy +import org.neo4j.connectors.kafka.testing.sink.Neo4jSink +import org.neo4j.connectors.kafka.testing.sink.NodePatternStrategy +import org.neo4j.connectors.kafka.testing.sink.RelationshipPatternStrategy +import org.neo4j.connectors.kafka.testing.sink.TopicProducer +import org.neo4j.connectors.kafka.testing.source.TopicConsumer +import org.neo4j.driver.Session + +class ErrorHeaders(private val headers: Headers) { + companion object { + const val TOPIC = "__connect.errors.topic" + const val PARTITION = "__connect.errors.partition" + const val OFFSET = "__connect.errors.offset" + const val CONNECTOR_NAME = "__connect.errors.connector.name" + const val TASK_ID = "__connect.errors.task.id" + const val STAGE = "__connect.errors.stage" + const val CLASS_NAME = "__connect.errors.class.name" + const val EXCEPTION_CLASS_NAME = "__connect.errors.exception.class.name" + const val EXCEPTION_MESSAGE = "__connect.errors.exception.message" + const val EXCEPTION_STACKTRACE = "__connect.errors.exception.stacktrace" + } + + fun getValue(key: String): Any? { + return headers + .find { it.key() == key } + ?.let { + val schemaAndValue = SimpleHeaderConverter().toConnectHeader("", it.key(), it.value()) + when (key) { + TOPIC -> schemaAndValue.value() as String + PARTITION -> (schemaAndValue.value() as Byte).toInt() + OFFSET -> (schemaAndValue.value() as Byte).toInt() + CONNECTOR_NAME -> schemaAndValue.value() as String + TASK_ID -> (schemaAndValue.value() as Byte).toInt() + STAGE -> schemaAndValue.value() as String + CLASS_NAME -> schemaAndValue.value() as String + EXCEPTION_CLASS_NAME -> schemaAndValue.value() as String + EXCEPTION_MESSAGE -> schemaAndValue.value() as String? + EXCEPTION_STACKTRACE -> schemaAndValue.value() as String + else -> throw IllegalArgumentException("Unknown error key $key") + } + } + } +} + +abstract class Neo4jSinkErrorIT { + companion object { + private const val TOPIC = "topic" + private const val TOPIC_1 = "topic-1" + private const val TOPIC_2 = "topic-2" + private const val TOPIC_3 = "topic-3" + private const val DLQ_TOPIC = "dlq-topic" + } + + @Neo4jSink( + cypher = + [ + CypherStrategy( + TOPIC, + "MERGE (p:Person {id: event.id, name: event.name, surname: event.surname})")], + errorDlqTopic = DLQ_TOPIC, + enableErrorHeaders = true) + @Test + fun `should report an error with all error headers when headers are enabled`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer, + session: Session + ) { + session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume() + + val schemaWithMissingSurname = + SchemaBuilder.struct() + .field("id", Schema.INT64_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .build() + val struct = Struct(schemaWithMissingSurname) + struct.put("id", 1L) + struct.put("name", "John") + producer.publish(valueSchema = schemaWithMissingSurname, value = struct) + + TopicVerifier.createForMap(errorConsumer) + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.TOPIC) shouldBe producer.topic + errorHeaders.getValue(ErrorHeaders.PARTITION) shouldBe 0 + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 0 + (errorHeaders.getValue(ErrorHeaders.CONNECTOR_NAME) as String) shouldContain + "Neo4jSinkConnector" + errorHeaders.getValue(ErrorHeaders.TASK_ID) shouldBe 0 + errorHeaders.getValue(ErrorHeaders.STAGE) shouldBe "TASK_PUT" + errorHeaders.getValue(ErrorHeaders.CLASS_NAME) shouldBe + "org.apache.kafka.connect.sink.SinkTask" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.driver.exceptions.ClientException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + """Cannot merge the following node because of null property value for 'surname': (:Person {surname: null})""" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_STACKTRACE) shouldNotBe null + + it.value shouldBe mapOf("id" to 1L, "name" to "John") + } + .verifyWithin(Duration.ofSeconds(30)) + } + + @Neo4jSink( + cypher = + [ + CypherStrategy( + TOPIC, + "MERGE (p:Person {id: event.id, name: event.name, surname: event.surname})")], + errorDlqTopic = DLQ_TOPIC, + enableErrorHeaders = true) + @Test + fun `should report failed events with cypher strategy`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer, + session: Session, + ) = runTest { + session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume() + + val schema = + SchemaBuilder.struct() + .field("id", Schema.INT64_SCHEMA) + .field("name", Schema.OPTIONAL_STRING_SCHEMA) + .field("surname", Schema.OPTIONAL_STRING_SCHEMA) + .build() + + val struct1 = Struct(schema) + struct1.put("id", 1L) + struct1.put("name", "John") + struct1.put("surname", "Doe") + val struct2ToFail = Struct(schema) + struct2ToFail.put("id", 2L) + struct2ToFail.put("surname", "Doe") + val struct3 = Struct(schema) + struct3.put("id", 3L) + struct3.put("name", "Mary") + struct3.put("surname", "Doe") + val struct4ToFail = Struct(schema) + struct4ToFail.put("id", 4L) + struct4ToFail.put("name", "Martin") + val struct5 = Struct(schema) + struct5.put("id", 5L) + struct5.put("name", "Sue") + struct5.put("surname", "Doe") + + producer.publish( + KafkaMessage(valueSchema = schema, value = struct1), + KafkaMessage(valueSchema = schema, value = struct2ToFail), + KafkaMessage(valueSchema = schema, value = struct3), + KafkaMessage(valueSchema = schema, value = struct4ToFail), + KafkaMessage(valueSchema = schema, value = struct5)) + + eventually(30.seconds) { + session.run("MATCH (n) RETURN n", emptyMap()).list().map { + it.get("n").asNode().let { n -> (n.labels() to n.asMap()) } + } shouldContainExactlyInAnyOrder + listOf( + (listOf("Person") to mapOf("id" to 1L, "name" to "John", "surname" to "Doe")), + (listOf("Person") to mapOf("id" to 3L, "name" to "Mary", "surname" to "Doe")), + (listOf("Person") to mapOf("id" to 5L, "name" to "Sue", "surname" to "Doe"))) + } + + TopicVerifier.createForMap(errorConsumer) + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 1 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.driver.exceptions.ClientException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Cannot merge the following node because of null property value for 'name': (:Person {name: null})" + + it.value shouldBe mapOf("id" to 2L, "surname" to "Doe") + } + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 3 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.driver.exceptions.ClientException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Cannot merge the following node because of null property value for 'surname': (:Person {surname: null})" + + it.value shouldBe mapOf("id" to 4L, "name" to "Martin") + } + .verifyWithin(Duration.ofSeconds(30)) + } + + @Neo4jSink( + nodePattern = + [ + NodePatternStrategy( + TOPIC, "(:Person{!id, name, surname})", mergeNodeProperties = false)], + errorDlqTopic = DLQ_TOPIC, + enableErrorHeaders = true) + @Test + fun `should report failed events with node pattern strategy`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer, + session: Session, + ) = runTest { + session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume() + val message1 = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = """{"id": 1, "name": "John", "surname": "Doe"}""") + val message2ToFail = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = """{"id": 2, name: "Jane", "surname": "Doe"}""") + val message3 = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = """{"id": 3, "name": "Mary", "surname": "Doe"}""") + val message4ToFail = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, value = """{"name": "Martin", "surname": "Doe"}""") + val message5 = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = """{"id": 5, "name": "Sue", "surname": "Doe"}""") + + producer.publish(message1, message2ToFail, message3, message4ToFail, message5) + + eventually(30.seconds) { + session.run("MATCH (n) RETURN n", emptyMap()).list().map { + it.get("n").asNode().let { n -> (n.labels() to n.asMap()) } + } shouldContainExactlyInAnyOrder + listOf( + (listOf("Person") to mapOf("id" to 1L, "name" to "John", "surname" to "Doe")), + (listOf("Person") to mapOf("id" to 3L, "name" to "Mary", "surname" to "Doe")), + (listOf("Person") to mapOf("id" to 5L, "name" to "Sue", "surname" to "Doe"))) + } + + TopicVerifier.create(errorConsumer) + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 1 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.apache.kafka.connect.errors.ConnectException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Message value must be convertible to a Map." + + it.value shouldBe message2ToFail.value + } + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 3 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.connectors.kafka.exceptions.InvalidDataException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Key 'id' could not be located in the message." + + it.value shouldBe message4ToFail.value + } + .verifyWithin(Duration.ofSeconds(30)) + } + + @Neo4jSink( + relationshipPattern = + [ + RelationshipPatternStrategy( + TOPIC, + "(:Person{!id: personId})-[:OWNS]->(:Item{!id: itemId})", + mergeNodeProperties = false, + mergeRelationshipProperties = false)], + errorDlqTopic = DLQ_TOPIC, + enableErrorHeaders = true) + @Test + fun `should report failed events with relationship pattern strategy`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer, + session: Session, + ) = runTest { + session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume() + session.run("CREATE CONSTRAINT FOR (n:Item) REQUIRE n.id IS KEY").consume() + + val message1 = + KafkaMessage(valueSchema = Schema.STRING_SCHEMA, value = """{"personId": 1, "itemId": 1}""") + val message2 = + KafkaMessage(valueSchema = Schema.STRING_SCHEMA, value = """{"personId": 2, "itemId": 2}""") + val message3ToFail = + KafkaMessage(valueSchema = Schema.STRING_SCHEMA, value = """{personId: 3, "itemId": 3}""") + val message4 = + KafkaMessage(valueSchema = Schema.STRING_SCHEMA, value = """{"personId": 4, "itemId": 4}""") + val message5ToFail = + KafkaMessage(valueSchema = Schema.STRING_SCHEMA, value = """{"personId": 5}""") + + producer.publish(message1, message2, message3ToFail, message4, message5ToFail) + + eventually(30.seconds) { + val result1 = + session + .run( + "MATCH (p:Person {id: ${'$'}productId})-[r:OWNS]->(i:Item {id: ${'$'}itemId}) RETURN p,r,i", + mapOf("productId" to 1L, "itemId" to 1L)) + .single() + + result1.get("p").asNode() should + { + it.labels() shouldBe listOf("Person") + it.asMap() shouldBe mapOf("id" to 1L) + } + + result1.get("r").asRelationship() should { it.type() shouldBe "OWNS" } + + result1.get("i").asNode() should + { + it.labels() shouldBe listOf("Item") + it.asMap() shouldBe mapOf("id" to 1L) + } + + val result2 = + session + .run( + "MATCH (p:Person {id: ${'$'}productId})-[r:OWNS]->(i:Item {id: ${'$'}itemId}) RETURN p,r,i", + mapOf("productId" to 2L, "itemId" to 2L)) + .single() + + result2.get("p").asNode() should + { + it.labels() shouldBe listOf("Person") + it.asMap() shouldBe mapOf("id" to 2L) + } + + result2.get("r").asRelationship() should { it.type() shouldBe "OWNS" } + + result2.get("i").asNode() should + { + it.labels() shouldBe listOf("Item") + it.asMap() shouldBe mapOf("id" to 2L) + } + + val result3 = + session + .run( + "MATCH (p:Person {id: ${'$'}productId})-[r:OWNS]->(i:Item {id: ${'$'}itemId}) RETURN p,r,i", + mapOf("productId" to 4L, "itemId" to 4L)) + .single() + + result3.get("p").asNode() should + { + it.labels() shouldBe listOf("Person") + it.asMap() shouldBe mapOf("id" to 4L) + } + + result3.get("r").asRelationship() should { it.type() shouldBe "OWNS" } + + result3.get("i").asNode() should + { + it.labels() shouldBe listOf("Item") + it.asMap() shouldBe mapOf("id" to 4L) + } + } + + TopicVerifier.create(errorConsumer) + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 2 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.apache.kafka.connect.errors.ConnectException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Message value must be convertible to a Map." + + it.value shouldBe message3ToFail.value + } + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 4 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.connectors.kafka.exceptions.InvalidDataException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Key 'itemId' could not be located in the message." + + it.value shouldBe message5ToFail.value + } + .verifyWithin(Duration.ofSeconds(30)) + } + + @Neo4jSink(cud = [CudStrategy(TOPIC)], errorDlqTopic = DLQ_TOPIC, enableErrorHeaders = true) + @Test + fun `should report failed events with cud strategy`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer, + session: Session, + ) = runTest { + session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume() + + val message1ToFail = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = + """{ + "type": "node", + "labels": ["Person"], + "properties": { + "id": 1, + "name": "John", + "surname": "Doe" + } + }""") + val message2 = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = + """{ + "type": "node", + "op": "create", + "labels": ["Person"], + "properties": { + "id": 2, + "name": "Jane", + "surname": "Doe" + } + }""") + val message3ToFail = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = + """{ + "type": "node", + "op": "create", + "labels": ["Person"], + "properties": { + "id": 3, + "name": "Mary" + "surname": "Doe" + } + }""") + val message4 = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = + """{ + "type": "node", + "op": "create", + "labels": ["Person"], + "properties": { + "id": 4, + "name": "Martin", + "surname": "Doe" + } + }""") + val message5 = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = + """{ + "type": "node", + "op": "create", + "labels": ["Person"], + "properties": { + "id": 5, + "name": "Sue", + "surname": "Doe" + } + }""") + + producer.publish(message1ToFail, message2, message3ToFail, message4, message5) + + eventually(30.seconds) { + session.run("MATCH (n) RETURN n", emptyMap()).list().map { + it.get("n").asNode().let { n -> (n.labels() to n.asMap()) } + } shouldContainExactlyInAnyOrder + listOf( + (listOf("Person") to mapOf("id" to 2L, "name" to "Jane", "surname" to "Doe")), + (listOf("Person") to mapOf("id" to 4L, "name" to "Martin", "surname" to "Doe")), + (listOf("Person") to mapOf("id" to 5L, "name" to "Sue", "surname" to "Doe"))) + } + + TopicVerifier.create(errorConsumer) + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 0 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "java.lang.IllegalArgumentException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Unsupported data type ('null') for CUD file operation" + + it.value shouldBe message1ToFail.value + } + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 2 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.apache.kafka.connect.errors.ConnectException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Message value must be convertible to a Map." + + it.value shouldBe message3ToFail.value + } + .verifyWithin(Duration.ofSeconds(30)) + } + + @Neo4jSink( + cdcSchema = [CdcSchemaStrategy(TOPIC)], errorDlqTopic = DLQ_TOPIC, enableErrorHeaders = true) + @Test + fun `should report failed events with cdc schema strategy`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + @TopicConsumer(DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer, + session: Session + ) = runTest { + session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume() + + val event1ToFail = + newEvent( + 1, + 1, + NodeEvent( + "person1", + EntityOperation.UPDATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L))), + null, + NodeState( + listOf("Person"), mapOf("id" to 1L, "name" to "John", "surname" to "Doe")))) + + val event2 = + newEvent( + 1, + 2, + NodeEvent( + "person1", + EntityOperation.CREATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L))), + null, + NodeState( + listOf("Person"), mapOf("id" to 2L, "name" to "Jane", "surname" to "Doe")))) + + val event3ToFail = + newEvent( + 1, + 3, + NodeEvent( + "person1", + EntityOperation.UPDATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 3L))), + null, + NodeState( + listOf("Person"), mapOf("id" to 3L, "name" to "Mary", "surname" to "Doe")))) + + val event4 = + newEvent( + 1, + 4, + NodeEvent( + "person1", + EntityOperation.CREATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 4L))), + null, + NodeState( + listOf("Person"), mapOf("id" to 4L, "name" to "Martin", "surname" to "Doe")))) + + val event5 = + newEvent( + 1, + 5, + NodeEvent( + "person1", + EntityOperation.CREATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 5L))), + null, + NodeState( + listOf("Person"), mapOf("id" to 5L, "name" to "Sue", "surname" to "Doe")))) + + producer.publish(event1ToFail, event2, event3ToFail, event4, event5) + + eventually(30.seconds) { + session.run("MATCH (n) RETURN n", emptyMap()).list().map { + it.get("n").asNode().let { n -> (n.labels() to n.asMap()) } + } shouldContainExactlyInAnyOrder + listOf( + (listOf("Person") to mapOf("id" to 2L, "name" to "Jane", "surname" to "Doe")), + (listOf("Person") to mapOf("id" to 4L, "name" to "Martin", "surname" to "Doe")), + (listOf("Person") to mapOf("id" to 5L, "name" to "Sue", "surname" to "Doe"))) + } + + TopicVerifier.create(errorConsumer) + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 0 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.connectors.kafka.exceptions.InvalidDataException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "update operation requires 'before' field in the event object" + + it.value shouldBe event1ToFail + } + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 2 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.connectors.kafka.exceptions.InvalidDataException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "update operation requires 'before' field in the event object" + + it.value shouldBe event3ToFail + } + .verifyWithin(Duration.ofSeconds(30)) + } + + @Neo4jSink( + cdcSourceId = [CdcSourceIdStrategy(TOPIC, "SourceEvent", "sourceId")], + errorDlqTopic = DLQ_TOPIC, + enableErrorHeaders = true) + @Test + fun `should report failed events with cdc source id strategy`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + @TopicConsumer(DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer, + session: Session + ) = runTest { + session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume() + + val event1 = + newEvent( + 1, + 1, + NodeEvent( + "person1", + EntityOperation.CREATE, + listOf("SourceEvent"), + emptyMap(), + null, + NodeState( + listOf("SourceEvent"), + mapOf("id" to 1L, "name" to "John", "surname" to "Doe")))) + + val event2ToFail = + newEvent( + 1, + 2, + NodeEvent( + "person2", + EntityOperation.UPDATE, + listOf("SourceEvent"), + emptyMap(), + null, + NodeState( + listOf("SourceEvent"), + mapOf("id" to 2L, "name" to "Jane", "surname" to "Doe")))) + + val event3 = + newEvent( + 1, + 3, + NodeEvent( + "person3", + EntityOperation.CREATE, + listOf("SourceEvent"), + emptyMap(), + null, + NodeState( + listOf("SourceEvent"), mapOf("id" to 3, "name" to "Mary", "surname" to "Doe")))) + + val event4 = + newEvent( + 1, + 4, + NodeEvent( + "person4", + EntityOperation.CREATE, + listOf("SourceEvent"), + emptyMap(), + null, + NodeState( + listOf("SourceEvent"), + mapOf("id" to 4L, "name" to "Martin", "surname" to "Doe")))) + + val event5ToFail = + newEvent( + 1, + 5, + NodeEvent( + "person5", + EntityOperation.UPDATE, + listOf("SourceEvent"), + emptyMap(), + null, + NodeState( + listOf("SourceEvent"), mapOf("id" to 5L, "name" to "Sue", "surname" to "*")))) + + producer.publish(event1, event2ToFail, event3, event4, event5ToFail) + + eventually(30.seconds) { + session.run("MATCH (n) RETURN n", emptyMap()).list().map { + it.get("n").asNode().let { n -> (n.labels() to n.asMap()) } + } shouldContainExactlyInAnyOrder + listOf( + (listOf("SourceEvent") to + mapOf("sourceId" to "person1", "id" to 1L, "name" to "John", "surname" to "Doe")), + (listOf("SourceEvent") to + mapOf("sourceId" to "person3", "id" to 3L, "name" to "Mary", "surname" to "Doe")), + (listOf("SourceEvent") to + mapOf( + "sourceId" to "person4", "id" to 4L, "name" to "Martin", "surname" to "Doe"))) + } + + TopicVerifier.create(errorConsumer) + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 1 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.connectors.kafka.exceptions.InvalidDataException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "update operation requires 'before' field in the event object" + + it.value shouldBe event2ToFail + } + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 4 + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.connectors.kafka.exceptions.InvalidDataException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "update operation requires 'before' field in the event object" + + it.value shouldBe event5ToFail + } + .verifyWithin(Duration.ofSeconds(30)) + } + + @Neo4jSink( + nodePattern = + [NodePatternStrategy(TOPIC, "(:User{!id, name, surname})", mergeNodeProperties = false)], + errorTolerance = "none", + errorDlqTopic = DLQ_TOPIC) + @Test + fun `should stop the process and only report first failed event when error tolerance is none`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer, + session: Session, + ) = runTest { + val message1 = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = """{"id": 1, "name": "John", "surname": "Doe"}""") + val message2ToFail = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = """{"id": 2, name: "Jane", "surname": "Doe"}""") + val message3 = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = """{"id": 3, name: "Mary", "surname": "Doe"}""") + + producer.publish(message1, message2ToFail, message3) + + eventually(30.seconds) { + session.run("MATCH (n) RETURN n", emptyMap()).single().get("n").asNode() should + { + it.labels() shouldBe listOf("User") + it.asMap() shouldBe mapOf("id" to 1L, "name" to "John", "surname" to "Doe") + } + } + + TopicVerifier.create(errorConsumer) + .assertMessageValue { it shouldBe message2ToFail.value } + .verifyWithin(Duration.ofSeconds(30)) + } + + @Neo4jSink( + cud = [CudStrategy(TOPIC_1)], + cypher = + [ + CypherStrategy( + TOPIC_2, + "MERGE (p:Person {id: event.id, name: event.name, surname: event.surname})")], + nodePattern = + [ + NodePatternStrategy( + TOPIC_3, "(:Person{!id, name, surname})", mergeNodeProperties = false)], + errorDlqTopic = DLQ_TOPIC, + enableErrorHeaders = true) + @Test + fun `should report failed events from different topics`( + @TopicProducer(TOPIC_1) producer1: ConvertingKafkaProducer, + @TopicProducer(TOPIC_2) producer2: ConvertingKafkaProducer, + @TopicProducer(TOPIC_3) producer3: ConvertingKafkaProducer, + @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer, + session: Session, + ) = runTest { + val cudMessageToFail = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = + """{ + "type": "node", + "labels": ["User"], + "properties": { + "id": 1, + "name": "John", + "surname": "Doe" + } + }""") + + val cypherMessage = + KafkaMessage( + valueSchema = Schema.STRING_SCHEMA, + value = """{"id": 1, "name": "John", "surname": "Doe"}""") + + val nodePatternMessageToFail = + KafkaMessage(valueSchema = Schema.STRING_SCHEMA, value = """{}""") + + producer1.publish(cudMessageToFail) + producer2.publish(cypherMessage) + producer3.publish(nodePatternMessageToFail) + + eventually(30.seconds) { + session.run("MATCH (n) RETURN n", emptyMap()).single().get("n").asNode() should + { + it.labels() shouldBe listOf("Person") + it.asMap() shouldBe mapOf("id" to 1L, "name" to "John", "surname" to "Doe") + } + } + + TopicVerifier.create(consumer) + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.TOPIC) shouldBe producer1.topic + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "java.lang.IllegalArgumentException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Unsupported data type ('null') for CUD file operation" + + it.value shouldBe cudMessageToFail.value + } + .assertMessage { + val errorHeaders = ErrorHeaders(it.raw.headers()) + errorHeaders.getValue(ErrorHeaders.TOPIC) shouldBe producer3.topic + errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe + "org.neo4j.connectors.kafka.exceptions.InvalidDataException" + errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe + "Key 'id' could not be located in the message." + + it.value shouldBe nodePatternMessageToFail.value + } + .verifyWithin(Duration.ofSeconds(30)) + } + + private fun newEvent(txId: Long, seq: Int, event: Event): ChangeEvent = + ChangeEvent( + ChangeIdentifier("$txId:$seq"), + txId, + seq, + Metadata( + "neo4j", + "neo4j", + "server-id", + CaptureMode.DIFF, + "bolt", + "localhost:32000", + "localhost:7687", + ZonedDateTime.now().minusSeconds(5), + ZonedDateTime.now(), + emptyMap(), + emptyMap()), + event) +} + +@KeyValueConverter(key = AVRO, value = AVRO) class Neo4jSinkErrorAvroIT : Neo4jSinkErrorIT() + +@KeyValueConverter(key = JSON_SCHEMA, value = JSON_SCHEMA) +class Neo4jSinkErrorJsonIT : Neo4jSinkErrorIT() + +@KeyValueConverter(key = PROTOBUF, value = PROTOBUF) +class Neo4jSinkErrorProtobufIT : Neo4jSinkErrorIT() diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt index 83699a5b..817ee586 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt @@ -48,15 +48,32 @@ class Neo4jSinkTask : SinkTask() { ?.map { SinkMessage(it) } ?.groupBy { it.topic } ?.mapKeys { topicHandlers.getValue(it.key) } - ?.forEach { (handler, messages) -> - val txGroups = handler.handle(messages) - - txGroups.forEach { group -> - config.session().use { session -> - session.writeTransaction( - { tx -> group.forEach { tx.run(it.query).consume() } }, config.txConfig()) - } - } + ?.forEach { (handler, messages) -> processMessages(handler, messages) } + } + + private fun processMessages(handler: SinkStrategyHandler, messages: List) { + val handled = mutableSetOf() + try { + val txGroups = handler.handle(messages) + + txGroups.forEach { group -> + config.session().use { session -> + session.writeTransaction( + { tx -> group.forEach { tx.run(it.query).consume() } }, + config.txConfig(), + ) } + + handled.addAll(group.flatMap { it.messages }) + } + } catch (e: Throwable) { + val unhandled = messages.minus(handled) + + if (unhandled.size > 1) { + unhandled.forEach { m -> processMessages(handler, listOf(m)) } + } else { + unhandled.forEach { m -> context.errantRecordReporter()?.report(m.record, e)?.get() } + } + } } } diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt index 04777944..f6e2237b 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt @@ -102,7 +102,12 @@ enum class SinkStrategy(val description: String) { RELATIONSHIP_PATTERN("relationship-pattern") } -data class ChangeQuery(val txId: Long?, val seq: Int?, val query: Query) +data class ChangeQuery( + val txId: Long?, + val seq: Int?, + val messages: Iterable, + val query: Query +) interface SinkStrategyHandler { diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcHandler.kt index 2973a786..18fd2d6d 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcHandler.kt @@ -35,19 +35,21 @@ import org.slf4j.LoggerFactory abstract class CdcHandler : SinkStrategyHandler { private val logger: Logger = LoggerFactory.getLogger(javaClass) + data class MessageToEvent(val message: SinkMessage, val changeEvent: ChangeEvent) + override fun handle(messages: Iterable): Iterable> { return messages .onEach { logger.trace("received message: {}", it) } - .map { it.toChangeEvent() } - .map { it.txId to it } - .onEach { logger.trace("converted message: {} to {}", it.first, it.second) } + .map { MessageToEvent(it, it.toChangeEvent()) } + .onEach { logger.trace("converted message: {} to {}", it.changeEvent.txId, it.changeEvent) } .groupBy( - { it.first }, + { it.changeEvent.txId }, { ChangeQuery( - it.second.txId, - it.second.seq, - when (val event = it.second.event) { + it.changeEvent.txId, + it.changeEvent.seq, + listOf(it.message), + when (val event = it.changeEvent.event) { is NodeEvent -> when (event.operation) { EntityOperation.CREATE -> transformCreate(event) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt index a3e991b0..477a524c 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt @@ -18,6 +18,7 @@ package org.neo4j.connectors.kafka.sink.strategy import org.neo4j.cdc.client.model.NodeEvent import org.neo4j.cdc.client.model.RelationshipEvent +import org.neo4j.connectors.kafka.exceptions.InvalidDataException import org.neo4j.connectors.kafka.sink.SinkStrategy import org.neo4j.cypherdsl.core.Cypher import org.neo4j.cypherdsl.core.Node @@ -30,6 +31,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH override fun strategy() = SinkStrategy.CDC_SCHEMA override fun transformCreate(event: NodeEvent): Query { + if (event.after == null) { + throw InvalidDataException("create operation requires 'after' field in the event object") + } + val node = buildNode(event.keys, "n") val stmt = Cypher.merge(node) @@ -48,6 +53,13 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH } override fun transformUpdate(event: NodeEvent): Query { + if (event.before == null) { + throw InvalidDataException("update operation requires 'before' field in the event object") + } + if (event.after == null) { + throw InvalidDataException("update operation requires 'after' field in the event object") + } + val node = buildNode(event.keys, "n") val stmt = Cypher.merge(node) @@ -81,6 +93,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH } override fun transformCreate(event: RelationshipEvent): Query { + if (event.after == null) { + throw InvalidDataException("create operation requires 'after' field in the event object") + } + val (start, end, rel) = buildRelationship(event, "r") val stmt = Cypher.merge(start) @@ -93,6 +109,13 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH } override fun transformUpdate(event: RelationshipEvent): Query { + if (event.before == null) { + throw InvalidDataException("update operation requires 'before' field in the event object") + } + if (event.after == null) { + throw InvalidDataException("update operation requires 'after' field in the event object") + } + val (start, end, rel) = buildRelationship(event, "r") val stmt = Cypher.merge(start) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandler.kt index 4ecc7ffd..a230f358 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandler.kt @@ -18,6 +18,7 @@ package org.neo4j.connectors.kafka.sink.strategy import org.neo4j.cdc.client.model.NodeEvent import org.neo4j.cdc.client.model.RelationshipEvent +import org.neo4j.connectors.kafka.exceptions.InvalidDataException import org.neo4j.connectors.kafka.sink.SinkStrategy import org.neo4j.cypherdsl.core.Cypher import org.neo4j.cypherdsl.core.Node @@ -35,6 +36,10 @@ class CdcSourceIdHandler( override fun strategy() = SinkStrategy.CDC_SOURCE_ID override fun transformCreate(event: NodeEvent): Query { + if (event.after == null) { + throw InvalidDataException("create operation requires 'after' field in the event object") + } + val node = buildNode(event.elementId, "n") val stmt = Cypher.merge(node) @@ -52,6 +57,13 @@ class CdcSourceIdHandler( } override fun transformUpdate(event: NodeEvent): Query { + if (event.before == null) { + throw InvalidDataException("update operation requires 'before' field in the event object") + } + if (event.after == null) { + throw InvalidDataException("update operation requires 'after' field in the event object") + } + val node = buildNode(event.elementId, "n") val stmt = Cypher.merge(node) @@ -85,6 +97,10 @@ class CdcSourceIdHandler( } override fun transformCreate(event: RelationshipEvent): Query { + if (event.after == null) { + throw InvalidDataException("create operation requires 'after' field in the event object") + } + val (start, end, rel) = buildRelationship(event, "r") val stmt = Cypher.merge(start) @@ -97,6 +113,13 @@ class CdcSourceIdHandler( } override fun transformUpdate(event: RelationshipEvent): Query { + if (event.before == null) { + throw InvalidDataException("update operation requires 'before' field in the event object") + } + if (event.after == null) { + throw InvalidDataException("update operation requires 'after' field in the event object") + } + val (start, end, rel) = buildRelationship(event, "r") val stmt = Cypher.merge(start) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CudHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CudHandler.kt index c4866994..c23b3add 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CudHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CudHandler.kt @@ -23,15 +23,19 @@ import org.neo4j.connectors.kafka.sink.SinkStrategy import org.neo4j.connectors.kafka.sink.SinkStrategyHandler import org.neo4j.connectors.kafka.sink.strategy.cud.Operation import org.neo4j.cypherdsl.core.renderer.Renderer +import org.neo4j.driver.Query import org.slf4j.Logger import org.slf4j.LoggerFactory class CudHandler(val topic: String, private val renderer: Renderer, private val batchSize: Int) : SinkStrategyHandler { + private val logger: Logger = LoggerFactory.getLogger(javaClass) override fun strategy() = SinkStrategy.CUD + data class MessageToQuery(val message: SinkMessage, val query: Query) + @Suppress("UNCHECKED_CAST") override fun handle(messages: Iterable): Iterable> { return messages @@ -44,10 +48,10 @@ class CudHandler(val topic: String, private val renderer: Renderer, private val else -> throw ConnectException("Message value must be convertible to a Map.") } val cud = Operation.from(value) - cud.toQuery(renderer) + MessageToQuery(it, cud.toQuery(renderer)) } .chunked(batchSize) - .map { it.map { q -> ChangeQuery(null, null, q) } } + .map { it.map { data -> ChangeQuery(null, null, listOf(data.message), data.query) } } .onEach { logger.trace("mapped messages: '{}'", it) } .toList() } diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CypherHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CypherHandler.kt index c25974af..b019330a 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CypherHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CypherHandler.kt @@ -81,6 +81,8 @@ class CypherHandler( override fun strategy() = SinkStrategy.CYPHER + data class MessageToEventMap(val message: SinkMessage, val eventMap: Map) + override fun handle(messages: Iterable): Iterable> { return messages .asSequence() @@ -96,10 +98,17 @@ class CypherHandler( logger.trace("message '{}' mapped to: '{}'", it, mapped) - mapped + MessageToEventMap(it, mapped) } .chunked(batchSize) - .map { listOf(ChangeQuery(null, null, Query(rewrittenQuery, mapOf("events" to it)))) } + .map { + listOf( + ChangeQuery( + null, + null, + it.map { data -> data.message }, + Query(rewrittenQuery, mapOf("events" to it.map { data -> data.eventMap })))) + } .onEach { logger.trace("mapped messages: '{}'", it) } .toList() } diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandler.kt index d270a873..099a3815 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandler.kt @@ -65,6 +65,8 @@ class NodePatternHandler( override fun strategy() = SinkStrategy.NODE_PATTERN + data class MessageToEventList(val message: SinkMessage, val eventList: List) + override fun handle(messages: Iterable): Iterable> { return messages .asSequence() @@ -87,10 +89,17 @@ class NodePatternHandler( logger.trace("message '{}' mapped to: '{}'", it, mapped) - mapped + MessageToEventList(it, mapped) } .chunked(batchSize) - .map { listOf(ChangeQuery(null, null, Query(query, mapOf(EVENTS to it)))) } + .map { + listOf( + ChangeQuery( + null, + null, + it.map { data -> data.message }, + Query(query, mapOf(EVENTS to it.map { data -> data.eventList })))) + } .onEach { logger.trace("mapped messages: '{}'", it) } .toList() } diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/PatternHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/PatternHandler.kt index 090539ca..0a97b0e7 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/PatternHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/PatternHandler.kt @@ -19,6 +19,7 @@ package org.neo4j.connectors.kafka.sink.strategy import java.time.Instant import java.time.ZoneOffset import org.apache.kafka.connect.errors.ConnectException +import org.neo4j.connectors.kafka.exceptions.InvalidDataException import org.neo4j.connectors.kafka.sink.SinkConfiguration import org.neo4j.connectors.kafka.sink.SinkMessage import org.neo4j.connectors.kafka.sink.SinkStrategyHandler @@ -96,17 +97,31 @@ abstract class PatternHandler( .mapValues { (_, mapping) -> if (isExplicit(mapping.from)) { val newKey = if (isTombstone) replaceValueWithKey(mapping.from) else mapping.from - usedTracker += newKey - return@mapValues flattened[newKey] - } - - for (prefix in prefixes) { - val key = "$prefix.${mapping.from}" + if (flattened.containsKey(newKey)) { + usedTracker += newKey + return@mapValues flattened[newKey] + } + if (mapping.from.startsWith("$bindKeyAs.")) { + throw InvalidDataException( + "Key '${mapping.from.replace("$bindKeyAs.", "")}' could not be located in the keys.", + ) + } else { + throw InvalidDataException( + "Key '${mapping.from.replace("$bindValueAs.", "")}' could not be located in the values.", + ) + } + } else { + for (prefix in prefixes) { + val key = "$prefix.${mapping.from}" - if (flattened.containsKey(key)) { - usedTracker += key - return@mapValues flattened[key] + if (flattened.containsKey(key)) { + usedTracker += key + return@mapValues flattened[key] + } } + throw InvalidDataException( + "Key '${mapping.from}' could not be located in the message.", + ) } } diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RedirectingHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RedirectingHandler.kt deleted file mode 100644 index 02c574ab..00000000 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RedirectingHandler.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [https://neo4j.com] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.connectors.kafka.sink.strategy - -import org.neo4j.connectors.kafka.sink.ChangeQuery -import org.neo4j.connectors.kafka.sink.SinkMessage -import org.neo4j.connectors.kafka.sink.SinkStrategyHandler -import org.neo4j.connectors.kafka.sink.legacy.strategy.IngestionStrategy -import org.neo4j.connectors.kafka.sink.utils.toStreamsSinkEntity -import org.neo4j.driver.Query - -abstract class RedirectingHandler( - private val original: IngestionStrategy, - private val batchSize: Int -) : SinkStrategyHandler { - override fun handle(messages: Iterable): Iterable> { - val events = messages.map { it.record.toStreamsSinkEntity() } - - return (original.mergeNodeEvents(events) + - original.deleteNodeEvents(events) + - original.mergeRelationshipEvents(events) + - original.deleteRelationshipEvents(events)) - .map { q -> ChangeQuery(null, null, Query(q.query, mapOf("events" to q.events))) } - .chunked(batchSize) - } -} diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandler.kt index ba63ab8f..1c93a675 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandler.kt @@ -66,6 +66,8 @@ class RelationshipPatternHandler( override fun strategy() = SinkStrategy.RELATIONSHIP_PATTERN + data class MessageToEventList(val message: SinkMessage, val eventList: List) + override fun handle(messages: Iterable): Iterable> { return messages .asSequence() @@ -104,10 +106,17 @@ class RelationshipPatternHandler( logger.trace("message '{}' mapped to: '{}'", it, mapped) - mapped + MessageToEventList(it, mapped) } .chunked(batchSize) - .map { listOf(ChangeQuery(null, null, Query(query, mapOf(EVENTS to it)))) } + .map { + listOf( + ChangeQuery( + null, + null, + it.map { data -> data.message }, + Query(query, mapOf(EVENTS to it.map { data -> data.eventList })))) + } .onEach { logger.trace("mapped messages: '{}'", it) } .toList() } diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt index 6e9884f3..cbf077b0 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt @@ -23,12 +23,14 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.throwable.shouldHaveMessage import java.time.LocalDate import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import org.neo4j.cdc.client.model.EntityOperation import org.neo4j.cdc.client.model.Node import org.neo4j.cdc.client.model.NodeEvent import org.neo4j.cdc.client.model.NodeState import org.neo4j.cdc.client.model.RelationshipEvent import org.neo4j.cdc.client.model.RelationshipState +import org.neo4j.connectors.kafka.exceptions.InvalidDataException import org.neo4j.connectors.kafka.sink.ChangeQuery import org.neo4j.connectors.kafka.sink.SinkMessage import org.neo4j.connectors.kafka.sink.strategy.TestUtils.newChangeEventMessage @@ -112,28 +114,28 @@ class CdcSchemaHandlerTest { @Test fun `should generate correct statement for node creation events`() { - verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.CREATE, + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.CREATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), + null, + NodeState( listOf("Person"), - mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), - null, - NodeState( - listOf("Person"), - mapOf( - "name" to "john", - "surname" to "doe", - "dob" to LocalDate.of(1990, 1, 1)))), - 1, - 0)), + mapOf( + "name" to "john", "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))), + 1, + 0) + verify( + listOf(sinkMessage), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage), Query( "MERGE (n:`Person` {name: ${'$'}nName, surname: ${'$'}nSurname}) SET n = ${'$'}nProps", mapOf( @@ -145,28 +147,28 @@ class CdcSchemaHandlerTest { "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))))))) - verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.CREATE, + val sinkMessage1 = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.CREATE, + listOf("Person", "Employee"), + mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), + null, + NodeState( listOf("Person", "Employee"), - mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), - null, - NodeState( - listOf("Person", "Employee"), - mapOf( - "name" to "john", - "surname" to "doe", - "dob" to LocalDate.of(1990, 1, 1)))), - 1, - 0)), + mapOf( + "name" to "john", "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))), + 1, + 0) + verify( + listOf(sinkMessage1), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage1), Query( "MERGE (n:`Person` {name: ${'$'}nName, surname: ${'$'}nSurname}) SET n = ${'$'}nProps SET n:`Employee`", mapOf( @@ -181,29 +183,28 @@ class CdcSchemaHandlerTest { @Test fun `should generate correct statement for node update events`() { - verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.UPDATE, + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.UPDATE, + listOf("Person", "Employee"), + mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), + NodeState(listOf("Person", "Employee"), mapOf("name" to "joe", "surname" to "doe")), + NodeState( listOf("Person", "Employee"), - mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), - NodeState( - listOf("Person", "Employee"), mapOf("name" to "joe", "surname" to "doe")), - NodeState( - listOf("Person", "Employee"), - mapOf( - "name" to "john", - "surname" to "doe", - "dob" to LocalDate.of(1990, 1, 1)))), - 1, - 0)), + mapOf( + "name" to "john", "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))), + 1, + 0) + verify( + listOf(sinkMessage), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage), Query( "MERGE (n:`Person` {name: ${'$'}nName, surname: ${'$'}nSurname}) SET n += ${'$'}nProps", mapOf( @@ -212,30 +213,30 @@ class CdcSchemaHandlerTest { "nProps" to mapOf("name" to "john", "dob" to LocalDate.of(1990, 1, 1)))))))) - verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.UPDATE, + val sinkMessage1 = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.UPDATE, + listOf("Person", "Employee"), + mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), + NodeState( listOf("Person", "Employee"), - mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), - NodeState( - listOf("Person", "Employee"), - mapOf("name" to "joe", "surname" to "doe", "married" to true)), - NodeState( - listOf("Person", "Manager"), - mapOf( - "name" to "john", - "surname" to "doe", - "dob" to LocalDate.of(1990, 1, 1)))), - 1, - 0)), + mapOf("name" to "joe", "surname" to "doe", "married" to true)), + NodeState( + listOf("Person", "Manager"), + mapOf( + "name" to "john", "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))), + 1, + 0) + verify( + listOf(sinkMessage1), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage1), Query( "MERGE (n:`Person` {name: ${'$'}nName, surname: ${'$'}nSurname}) SET n += ${'$'}nProps SET n:`Manager` REMOVE n:`Employee`", mapOf( @@ -247,32 +248,32 @@ class CdcSchemaHandlerTest { "dob" to LocalDate.of(1990, 1, 1), "married" to null))))))) - verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.UPDATE, + val sinkMessage2 = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.UPDATE, + listOf("Person", "Employee"), + mapOf( + "Person" to listOf(mapOf("name" to "john", "surname" to "doe")), + "Employee" to listOf(mapOf("id" to 5000L))), + NodeState( listOf("Person", "Employee"), + mapOf("name" to "joe", "surname" to "doe", "married" to true)), + NodeState( + listOf("Person", "Manager"), mapOf( - "Person" to listOf(mapOf("name" to "john", "surname" to "doe")), - "Employee" to listOf(mapOf("id" to 5000L))), - NodeState( - listOf("Person", "Employee"), - mapOf("name" to "joe", "surname" to "doe", "married" to true)), - NodeState( - listOf("Person", "Manager"), - mapOf( - "name" to "john", - "surname" to "doe", - "dob" to LocalDate.of(1990, 1, 1)))), - 1, - 0)), + "name" to "john", "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))), + 1, + 0) + verify( + listOf(sinkMessage2), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage2), Query( "MERGE (n:`Person`:`Employee` {name: ${'$'}nName, surname: ${'$'}nSurname, id: ${'$'}nId}) SET n += ${'$'}nProps SET n:`Manager` REMOVE n:`Employee`", mapOf( @@ -288,28 +289,27 @@ class CdcSchemaHandlerTest { @Test fun `should generate correct statement for node deletion events`() { + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.DELETE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("name" to "joe", "surname" to "doe"))), + NodeState( + emptyList(), + mapOf("name" to "joe", "surname" to "doe", "dob" to LocalDate.of(2000, 1, 1))), + null), + 1, + 0) verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.DELETE, - listOf("Person"), - mapOf("Person" to listOf(mapOf("name" to "joe", "surname" to "doe"))), - NodeState( - emptyList(), - mapOf( - "name" to "joe", - "surname" to "doe", - "dob" to LocalDate.of(2000, 1, 1))), - null), - 1, - 0)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage), Query( "MATCH (n:`Person` {name: ${'$'}nName, surname: ${'$'}nSurname}) DETACH DELETE n", mapOf("nName" to "joe", "nSurname" to "doe")))))) @@ -317,31 +317,33 @@ class CdcSchemaHandlerTest { @Test fun `should generate correct statement for relationship creation events`() { + val sinkMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "KNOWS", + Node( + "start-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + emptyList(), + EntityOperation.CREATE, + null, + RelationshipState(mapOf("since" to LocalDate.of(2000, 1, 1)))), + 1, + 0) verify( - listOf( - newChangeEventMessage( - RelationshipEvent( - "rel-element-id", - "KNOWS", - Node( - "start-element-id", - listOf("Person"), - mapOf("Person" to listOf(mapOf("id" to 1L)))), - Node( - "end-element-id", - listOf("Person"), - mapOf("Person" to listOf(mapOf("id" to 2L)))), - emptyList(), - EntityOperation.CREATE, - null, - RelationshipState(mapOf("since" to LocalDate.of(2000, 1, 1)))), - 1, - 0)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage), Query( "MERGE (start:`Person` {id: ${'$'}startId}) " + "MERGE (end:`Person` {id: ${'$'}endId}) " + @@ -355,35 +357,37 @@ class CdcSchemaHandlerTest { @Test fun `should generate correct statement for relationship update events`() { + val sinkMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "KNOWS", + Node( + "start-element-id", + listOf("Person", "Employee"), + mapOf( + "Person" to listOf(mapOf("id" to 1L)), + "Employee" to listOf(mapOf("contractId" to 5000L)))), + Node( + "end-element-id", + listOf("Person", "Employee"), + mapOf( + "Person" to listOf(mapOf("id" to 2L)), + "Employee" to listOf(mapOf("contractId" to 5001L)))), + emptyList(), + EntityOperation.UPDATE, + RelationshipState(mapOf("since" to LocalDate.of(2000, 1, 1))), + RelationshipState(mapOf("since" to LocalDate.of(1999, 1, 1)))), + 1, + 0) verify( - listOf( - newChangeEventMessage( - RelationshipEvent( - "rel-element-id", - "KNOWS", - Node( - "start-element-id", - listOf("Person", "Employee"), - mapOf( - "Person" to listOf(mapOf("id" to 1L)), - "Employee" to listOf(mapOf("contractId" to 5000L)))), - Node( - "end-element-id", - listOf("Person", "Employee"), - mapOf( - "Person" to listOf(mapOf("id" to 2L)), - "Employee" to listOf(mapOf("contractId" to 5001L)))), - emptyList(), - EntityOperation.UPDATE, - RelationshipState(mapOf("since" to LocalDate.of(2000, 1, 1))), - RelationshipState(mapOf("since" to LocalDate.of(1999, 1, 1)))), - 1, - 0)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage), Query( "MERGE (start:`Person`:`Employee` {id: ${'$'}startId, contractId: ${'$'}startContractId}) " + "MERGE (end:`Person`:`Employee` {id: ${'$'}endId, contractId: ${'$'}endContractId}) " + @@ -396,33 +400,35 @@ class CdcSchemaHandlerTest { "endContractId" to 5001L, "rProps" to mapOf("since" to LocalDate.of(1999, 1, 1)))))))) + val sinkMessage1 = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "KNOWS", + Node( + "start-element-id", + listOf("Person", "Employee"), + mapOf( + "Person" to listOf(mapOf("id" to 1L)), + "Employee" to listOf(mapOf("contractId" to 5000L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + listOf(mapOf("id" to 1001L)), + EntityOperation.UPDATE, + RelationshipState(mapOf("name" to "john", "surname" to "doe")), + RelationshipState(mapOf("name" to "joe", "surname" to "doe"))), + 1, + 0) verify( - listOf( - newChangeEventMessage( - RelationshipEvent( - "rel-element-id", - "KNOWS", - Node( - "start-element-id", - listOf("Person", "Employee"), - mapOf( - "Person" to listOf(mapOf("id" to 1L)), - "Employee" to listOf(mapOf("contractId" to 5000L)))), - Node( - "end-element-id", - listOf("Person"), - mapOf("Person" to listOf(mapOf("id" to 2L)))), - listOf(mapOf("id" to 1001L)), - EntityOperation.UPDATE, - RelationshipState(mapOf("name" to "john", "surname" to "doe")), - RelationshipState(mapOf("name" to "joe", "surname" to "doe"))), - 1, - 0)), + listOf(sinkMessage1), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage1), Query( "MERGE (start:`Person`:`Employee` {id: ${'$'}startId, contractId: ${'$'}startContractId}) " + "MERGE (end:`Person` {id: ${'$'}endId}) " + @@ -438,31 +444,33 @@ class CdcSchemaHandlerTest { @Test fun `should generate correct statement for relationship deletion events`() { + val sinkMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "KNOWS", + Node( + "start-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + emptyList(), + EntityOperation.DELETE, + RelationshipState(mapOf("name" to "john", "surname" to "doe")), + null), + 1, + 0) verify( - listOf( - newChangeEventMessage( - RelationshipEvent( - "rel-element-id", - "KNOWS", - Node( - "start-element-id", - listOf("Person"), - mapOf("Person" to listOf(mapOf("id" to 1L)))), - Node( - "end-element-id", - listOf("Person"), - mapOf("Person" to listOf(mapOf("id" to 2L)))), - emptyList(), - EntityOperation.DELETE, - RelationshipState(mapOf("name" to "john", "surname" to "doe")), - null), - 1, - 0)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage), Query( "MATCH (start:`Person` {id: ${'$'}startId}) " + "MATCH (end:`Person` {id: ${'$'}endId}) " + @@ -470,33 +478,35 @@ class CdcSchemaHandlerTest { "DELETE r", mapOf("startId" to 1L, "endId" to 2L)))))) + val sinkMessage1 = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "KNOWS", + Node( + "start-element-id", + listOf("Person", "Employee"), + mapOf( + "Person" to listOf(mapOf("id" to 1L)), + "Employee" to listOf(mapOf("contractId" to 5000L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + listOf(mapOf("id" to 1001L)), + EntityOperation.DELETE, + RelationshipState(mapOf("name" to "john", "surname" to "doe")), + null), + 1, + 0) verify( - listOf( - newChangeEventMessage( - RelationshipEvent( - "rel-element-id", - "KNOWS", - Node( - "start-element-id", - listOf("Person", "Employee"), - mapOf( - "Person" to listOf(mapOf("id" to 1L)), - "Employee" to listOf(mapOf("contractId" to 5000L)))), - Node( - "end-element-id", - listOf("Person"), - mapOf("Person" to listOf(mapOf("id" to 2L)))), - listOf(mapOf("id" to 1001L)), - EntityOperation.DELETE, - RelationshipState(mapOf("name" to "john", "surname" to "doe")), - null), - 1, - 0)), + listOf(sinkMessage1), listOf( listOf( ChangeQuery( 1, 0, + listOf(sinkMessage1), Query( "MATCH (start:`Person`:`Employee` {id: ${'$'}startId, contractId: ${'$'}startContractId}) " + "MATCH (end:`Person` {id: ${'$'}endId}) " + @@ -566,6 +576,157 @@ class CdcSchemaHandlerTest { }) } + @Test + fun `should fail on null 'after' field with node create operation`() { + val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) + + val nodeChangeEventMessage = + newChangeEventMessage( + NodeEvent( + "person1", + EntityOperation.CREATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L))), + null, + null), + 1, + 0) + + assertThrows { + handler.handle(listOf(nodeChangeEventMessage)) + } shouldHaveMessage "create operation requires 'after' field in the event object" + } + + @Test + fun `should fail on null 'after' field with relationship create operation`() { + val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) + + val relationshipChangeEventMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node( + "start-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + emptyList(), + EntityOperation.CREATE, + null, + null), + 1, + 0) + + assertThrows { + handler.handle(listOf(relationshipChangeEventMessage)) + } shouldHaveMessage "create operation requires 'after' field in the event object" + } + + @Test + fun `should fail on null 'before' field with node update operation`() { + val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) + + val nodeChangeEventMessage = + newChangeEventMessage( + NodeEvent( + "person1", + EntityOperation.UPDATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L))), + null, + NodeState( + listOf("Person", "Employee"), mapOf("name" to "joe", "surname" to "doe"))), + 1, + 0) + + assertThrows { + handler.handle(listOf(nodeChangeEventMessage)) + } shouldHaveMessage "update operation requires 'before' field in the event object" + } + + @Test + fun `should fail on null 'before' field with relationship update operation`() { + val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) + + val relationshipChangeEventMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node( + "start-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + emptyList(), + EntityOperation.UPDATE, + null, + RelationshipState(mapOf("name" to "john", "surname" to "doe"))), + 1, + 0) + + assertThrows { + handler.handle(listOf(relationshipChangeEventMessage)) + } shouldHaveMessage "update operation requires 'before' field in the event object" + } + + @Test + fun `should fail on null 'after' field with node update operation`() { + val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) + + val nodeChangeEventMessage = + newChangeEventMessage( + NodeEvent( + "person1", + EntityOperation.UPDATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L))), + NodeState(listOf("Person", "Employee"), mapOf("name" to "joe", "surname" to "doe")), + null), + 1, + 0) + + assertThrows { + handler.handle(listOf(nodeChangeEventMessage)) + } shouldHaveMessage "update operation requires 'after' field in the event object" + } + + @Test + fun `should fail on null 'after' field with relationship update operation`() { + val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) + + val relationshipChangeEventMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node( + "start-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + emptyList(), + EntityOperation.UPDATE, + RelationshipState(mapOf("name" to "john", "surname" to "doe")), + null), + 1, + 0) + + assertThrows { + handler.handle(listOf(relationshipChangeEventMessage)) + } shouldHaveMessage "update operation requires 'after' field in the event object" + } + private fun verify(messages: Iterable, expected: Iterable>) { val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandlerTest.kt index 605a847d..953dff7d 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandlerTest.kt @@ -19,14 +19,17 @@ package org.neo4j.connectors.kafka.sink.strategy import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.collections.shouldMatchInOrder import io.kotest.matchers.shouldBe +import io.kotest.matchers.throwable.shouldHaveMessage import java.time.LocalDate import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import org.neo4j.cdc.client.model.EntityOperation import org.neo4j.cdc.client.model.Node import org.neo4j.cdc.client.model.NodeEvent import org.neo4j.cdc.client.model.NodeState import org.neo4j.cdc.client.model.RelationshipEvent import org.neo4j.cdc.client.model.RelationshipState +import org.neo4j.connectors.kafka.exceptions.InvalidDataException import org.neo4j.connectors.kafka.sink.ChangeQuery import org.neo4j.connectors.kafka.sink.SinkMessage import org.neo4j.connectors.kafka.sink.strategy.TestUtils.newChangeEventMessage @@ -39,51 +42,53 @@ class CdcSourceIdHandlerTest { @Test fun `should generate correct statement for node creation events`() { + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.CREATE, + emptyList(), + emptyMap(), + null, + NodeState(emptyList(), mapOf("name" to "john", "surname" to "doe"))), + 0, + 1) verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.CREATE, - emptyList(), - emptyMap(), - null, - NodeState(emptyList(), mapOf("name" to "john", "surname" to "doe"))), - 0, - 1)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage), Query( "MERGE (n:`SourceEvent` {sourceElementId: ${'$'}nElementId}) SET n += ${'$'}nProps", mapOf( "nElementId" to "node-element-id", "nProps" to mapOf("name" to "john", "surname" to "doe"))))))) - verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.CREATE, + val sinkMessage1 = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.CREATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), + null, + NodeState( listOf("Person"), - mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), - null, - NodeState( - listOf("Person"), - mapOf( - "name" to "john", - "surname" to "doe", - "dob" to LocalDate.of(1990, 1, 1)))), - 0, - 1)), + mapOf( + "name" to "john", "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))), + 0, + 1) + verify( + listOf(sinkMessage1), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage1), Query( "MERGE (n:`SourceEvent` {sourceElementId: ${'$'}nElementId}) SET n += ${'$'}nProps SET n:`Person`", mapOf( @@ -94,28 +99,28 @@ class CdcSourceIdHandlerTest { "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))))))) - verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.CREATE, + val sinkMessage2 = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.CREATE, + listOf("Person", "Employee"), + mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), + null, + NodeState( listOf("Person", "Employee"), - mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), - null, - NodeState( - listOf("Person", "Employee"), - mapOf( - "name" to "john", - "surname" to "doe", - "dob" to LocalDate.of(1990, 1, 1)))), - 0, - 1)), + mapOf( + "name" to "john", "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))), + 0, + 1) + verify( + listOf(sinkMessage2), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage2), Query( "MERGE (n:`SourceEvent` {sourceElementId: ${'$'}nElementId}) SET n += ${'$'}nProps SET n:`Person`:`Employee`", mapOf( @@ -129,24 +134,25 @@ class CdcSourceIdHandlerTest { @Test fun `should generate correct statement for node update events`() { + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.UPDATE, + emptyList(), + emptyMap(), + NodeState(emptyList(), mapOf("name" to "john")), + NodeState(emptyList(), mapOf("name" to "joe", "dob" to LocalDate.of(2000, 1, 1)))), + 0, + 1) verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.UPDATE, - emptyList(), - emptyMap(), - NodeState(emptyList(), mapOf("name" to "john")), - NodeState( - emptyList(), mapOf("name" to "joe", "dob" to LocalDate.of(2000, 1, 1)))), - 0, - 1)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage), Query( "MERGE (n:`SourceEvent` {sourceElementId: ${'$'}nElementId}) SET n += ${'$'}nProps", mapOf( @@ -154,29 +160,28 @@ class CdcSourceIdHandlerTest { "nProps" to mapOf("name" to "joe", "dob" to LocalDate.of(2000, 1, 1)))))))) - verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.UPDATE, + val sinkMessage1 = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.UPDATE, + listOf("Person", "Employee"), + mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), + NodeState(listOf("Person", "Employee"), mapOf("name" to "joe", "surname" to "doe")), + NodeState( listOf("Person", "Employee"), - mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), - NodeState( - listOf("Person", "Employee"), mapOf("name" to "joe", "surname" to "doe")), - NodeState( - listOf("Person", "Employee"), - mapOf( - "name" to "john", - "surname" to "doe", - "dob" to LocalDate.of(1990, 1, 1)))), - 0, - 1)), + mapOf( + "name" to "john", "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))), + 0, + 1) + verify( + listOf(sinkMessage1), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage1), Query( "MERGE (n:`SourceEvent` {sourceElementId: ${'$'}nElementId}) SET n += ${'$'}nProps", mapOf( @@ -184,30 +189,30 @@ class CdcSourceIdHandlerTest { "nProps" to mapOf("name" to "john", "dob" to LocalDate.of(1990, 1, 1)))))))) - verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.UPDATE, + val sinkMessage2 = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.UPDATE, + listOf("Person", "Employee"), + mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), + NodeState( listOf("Person", "Employee"), - mapOf("Person" to listOf(mapOf("name" to "john", "surname" to "doe"))), - NodeState( - listOf("Person", "Employee"), - mapOf("name" to "joe", "surname" to "doe", "married" to true)), - NodeState( - listOf("Person", "Manager"), - mapOf( - "name" to "john", - "surname" to "doe", - "dob" to LocalDate.of(1990, 1, 1)))), - 0, - 1)), + mapOf("name" to "joe", "surname" to "doe", "married" to true)), + NodeState( + listOf("Person", "Manager"), + mapOf( + "name" to "john", "surname" to "doe", "dob" to LocalDate.of(1990, 1, 1)))), + 0, + 1) + verify( + listOf(sinkMessage2), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage2), Query( "MERGE (n:`SourceEvent` {sourceElementId: ${'$'}nElementId}) SET n += ${'$'}nProps SET n:`Manager` REMOVE n:`Employee`", mapOf( @@ -221,24 +226,25 @@ class CdcSourceIdHandlerTest { @Test fun `should generate correct statement for node deletion events`() { + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.DELETE, + emptyList(), + emptyMap(), + NodeState(emptyList(), mapOf("name" to "joe", "dob" to LocalDate.of(2000, 1, 1))), + null), + 0, + 1) verify( - listOf( - newChangeEventMessage( - NodeEvent( - "node-element-id", - EntityOperation.DELETE, - emptyList(), - emptyMap(), - NodeState( - emptyList(), mapOf("name" to "joe", "dob" to LocalDate.of(2000, 1, 1))), - null), - 0, - 1)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage), Query( "MATCH (n:`SourceEvent` {sourceElementId: ${'$'}nElementId}) DETACH DELETE n", mapOf("nElementId" to "node-element-id")))))) @@ -246,25 +252,27 @@ class CdcSourceIdHandlerTest { @Test fun `should generate correct statement for relationship creation events`() { + val sinkMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node("start-element-id", emptyList(), emptyMap()), + Node("end-element-id", emptyList(), emptyMap()), + emptyList(), + EntityOperation.CREATE, + null, + RelationshipState(mapOf("name" to "john", "surname" to "doe"))), + 0, + 1) verify( - listOf( - newChangeEventMessage( - RelationshipEvent( - "rel-element-id", - "REL", - Node("start-element-id", emptyList(), emptyMap()), - Node("end-element-id", emptyList(), emptyMap()), - emptyList(), - EntityOperation.CREATE, - null, - RelationshipState(mapOf("name" to "john", "surname" to "doe"))), - 0, - 1)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage), Query( "MERGE (start:`SourceEvent` {sourceElementId: ${'$'}startElementId}) " + "MERGE (end:`SourceEvent` {sourceElementId: ${'$'}endElementId}) " + @@ -279,25 +287,27 @@ class CdcSourceIdHandlerTest { @Test fun `should generate correct statement for relationship update events`() { + val sinkMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node("start-element-id", emptyList(), emptyMap()), + Node("end-element-id", emptyList(), emptyMap()), + emptyList(), + EntityOperation.UPDATE, + RelationshipState(emptyMap()), + RelationshipState(mapOf("name" to "john", "surname" to "doe"))), + 0, + 1) verify( - listOf( - newChangeEventMessage( - RelationshipEvent( - "rel-element-id", - "REL", - Node("start-element-id", emptyList(), emptyMap()), - Node("end-element-id", emptyList(), emptyMap()), - emptyList(), - EntityOperation.UPDATE, - RelationshipState(emptyMap()), - RelationshipState(mapOf("name" to "john", "surname" to "doe"))), - 0, - 1)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage), Query( "MERGE (start:`SourceEvent` {sourceElementId: ${'$'}startElementId}) " + "MERGE (end:`SourceEvent` {sourceElementId: ${'$'}endElementId}) " + @@ -309,25 +319,27 @@ class CdcSourceIdHandlerTest { "rElementId" to "rel-element-id", "rProps" to mapOf("name" to "john", "surname" to "doe"))))))) + val sinkMessage1 = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node("start-element-id", emptyList(), emptyMap()), + Node("end-element-id", emptyList(), emptyMap()), + emptyList(), + EntityOperation.UPDATE, + RelationshipState(mapOf("name" to "john", "surname" to "doe")), + RelationshipState(mapOf("name" to "joe", "surname" to "doe"))), + 0, + 1) verify( - listOf( - newChangeEventMessage( - RelationshipEvent( - "rel-element-id", - "REL", - Node("start-element-id", emptyList(), emptyMap()), - Node("end-element-id", emptyList(), emptyMap()), - emptyList(), - EntityOperation.UPDATE, - RelationshipState(mapOf("name" to "john", "surname" to "doe")), - RelationshipState(mapOf("name" to "joe", "surname" to "doe"))), - 0, - 1)), + listOf(sinkMessage1), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage1), Query( "MERGE (start:`SourceEvent` {sourceElementId: ${'$'}startElementId}) " + "MERGE (end:`SourceEvent` {sourceElementId: ${'$'}endElementId}) " + @@ -342,25 +354,27 @@ class CdcSourceIdHandlerTest { @Test fun `should generate correct statement for relationship deletion events`() { + val sinkMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node("start-element-id", emptyList(), emptyMap()), + Node("end-element-id", emptyList(), emptyMap()), + emptyList(), + EntityOperation.DELETE, + RelationshipState(emptyMap()), + null), + 0, + 1) verify( - listOf( - newChangeEventMessage( - RelationshipEvent( - "rel-element-id", - "REL", - Node("start-element-id", emptyList(), emptyMap()), - Node("end-element-id", emptyList(), emptyMap()), - emptyList(), - EntityOperation.DELETE, - RelationshipState(emptyMap()), - null), - 0, - 1)), + listOf(sinkMessage), listOf( listOf( ChangeQuery( 0, 1, + listOf(sinkMessage), Query( "MATCH ()-[r:`REL` {sourceElementId: ${'$'}rElementId}]->() DELETE r", mapOf("rElementId" to "rel-element-id")))))) @@ -368,7 +382,12 @@ class CdcSourceIdHandlerTest { @Test fun `should split changes into transactional boundaries`() { - val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) + val handler = + CdcSourceIdHandler( + "my-topic", + Renderer.getRenderer(Configuration.defaultConfig()), + "SourceEvent", + "sourceElementId") val result = handler.handle( @@ -423,6 +442,187 @@ class CdcSourceIdHandlerTest { }) } + @Test + fun `should fail on null 'after' field with node create operation`() { + val handler = + CdcSourceIdHandler( + "my-topic", + Renderer.getRenderer(Configuration.defaultConfig()), + "SourceEvent", + "sourceElementId") + + val nodeChangeEventMessage = + newChangeEventMessage( + NodeEvent( + "person1", + EntityOperation.CREATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L))), + null, + null), + 1, + 0) + + assertThrows { + handler.handle(listOf(nodeChangeEventMessage)) + } shouldHaveMessage "create operation requires 'after' field in the event object" + } + + @Test + fun `should fail on null 'after' field with relationship create operation`() { + val handler = + CdcSourceIdHandler( + "my-topic", + Renderer.getRenderer(Configuration.defaultConfig()), + "SourceEvent", + "sourceElementId") + + val relationshipChangeEventMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node( + "start-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + emptyList(), + EntityOperation.CREATE, + null, + null), + 1, + 0) + + assertThrows { + handler.handle(listOf(relationshipChangeEventMessage)) + } shouldHaveMessage "create operation requires 'after' field in the event object" + } + + @Test + fun `should fail on null 'before' field with node update operation`() { + val handler = + CdcSourceIdHandler( + "my-topic", + Renderer.getRenderer(Configuration.defaultConfig()), + "SourceEvent", + "sourceElementId") + + val nodeChangeEventMessage = + newChangeEventMessage( + NodeEvent( + "person1", + EntityOperation.UPDATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L))), + null, + NodeState( + listOf("Person", "Employee"), mapOf("name" to "joe", "surname" to "doe"))), + 1, + 0) + + assertThrows { + handler.handle(listOf(nodeChangeEventMessage)) + } shouldHaveMessage "update operation requires 'before' field in the event object" + } + + @Test + fun `should fail on null 'before' field with relationship update operation`() { + val handler = + CdcSourceIdHandler( + "my-topic", + Renderer.getRenderer(Configuration.defaultConfig()), + "SourceEvent", + "sourceElementId") + + val relationshipChangeEventMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node( + "start-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + emptyList(), + EntityOperation.UPDATE, + null, + RelationshipState(mapOf("name" to "john", "surname" to "doe"))), + 1, + 0) + + assertThrows { + handler.handle(listOf(relationshipChangeEventMessage)) + } shouldHaveMessage "update operation requires 'before' field in the event object" + } + + @Test + fun `should fail on null 'after' field with node update operation`() { + val handler = + CdcSourceIdHandler( + "my-topic", + Renderer.getRenderer(Configuration.defaultConfig()), + "SourceEvent", + "sourceElementId") + + val nodeChangeEventMessage = + newChangeEventMessage( + NodeEvent( + "person1", + EntityOperation.UPDATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L))), + NodeState(listOf("Person", "Employee"), mapOf("name" to "joe", "surname" to "doe")), + null), + 1, + 0) + + assertThrows { + handler.handle(listOf(nodeChangeEventMessage)) + } shouldHaveMessage "update operation requires 'after' field in the event object" + } + + @Test + fun `should fail on null 'after' field with relationship update operation`() { + val handler = + CdcSourceIdHandler( + "my-topic", + Renderer.getRenderer(Configuration.defaultConfig()), + "SourceEvent", + "sourceElementId") + + val relationshipChangeEventMessage = + newChangeEventMessage( + RelationshipEvent( + "rel-element-id", + "REL", + Node( + "start-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 1L)))), + Node( + "end-element-id", + listOf("Person"), + mapOf("Person" to listOf(mapOf("id" to 2L)))), + emptyList(), + EntityOperation.UPDATE, + RelationshipState(mapOf("name" to "john", "surname" to "doe")), + null), + 1, + 0) + + assertThrows { + handler.handle(listOf(relationshipChangeEventMessage)) + } shouldHaveMessage "update operation requires 'after' field in the event object" + } + private fun verify(messages: Iterable, expected: Iterable>) { val handler = CdcSourceIdHandler( diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CudHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CudHandlerTest.kt index 62b976f9..6a6aa8e2 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CudHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CudHandlerTest.kt @@ -33,11 +33,10 @@ class CudHandlerTest : HandlerTest() { fun `should create node`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "node", "op": "create", @@ -47,12 +46,14 @@ class CudHandlerTest : HandlerTest() { "foo": "foo-value" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse("CREATE (n:`Foo`:`Bar` {}) SET n = ${'$'}properties") .cypher, @@ -63,11 +64,10 @@ class CudHandlerTest : HandlerTest() { fun `should create node without labels`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "node", "op": "create", @@ -76,12 +76,14 @@ class CudHandlerTest : HandlerTest() { "foo": "foo-value" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse("CREATE (n {}) SET n = ${'$'}properties").cypher, mapOf("properties" to mapOf("id" to 1, "foo" to "foo-value")))))) @@ -91,11 +93,10 @@ class CudHandlerTest : HandlerTest() { fun `should update node`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "node", "op": "UPDATE", @@ -108,12 +109,14 @@ class CudHandlerTest : HandlerTest() { "foo": "foo-value" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( "MATCH (n:`Foo`:`Bar` {id: ${'$'}keys.id}) SET n += ${'$'}properties") @@ -127,11 +130,10 @@ class CudHandlerTest : HandlerTest() { fun `should update node without labels`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "node", "op": "UPDATE", @@ -143,12 +145,14 @@ class CudHandlerTest : HandlerTest() { "foo": "foo-value" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( "MATCH (n {id: ${'$'}keys.id}) SET n += ${'$'}properties") @@ -162,11 +166,10 @@ class CudHandlerTest : HandlerTest() { fun `should merge node`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "NODE", "op": "merge", @@ -179,12 +182,14 @@ class CudHandlerTest : HandlerTest() { "foo": "foo-value" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( "MERGE (n:`Foo`:`Bar` {id: ${'$'}keys.id}) SET n += ${'$'}properties") @@ -198,11 +203,10 @@ class CudHandlerTest : HandlerTest() { fun `should merge node without labels`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "NODE", "op": "merge", @@ -214,12 +218,14 @@ class CudHandlerTest : HandlerTest() { "foo": "foo-value" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( "MERGE (n {id: ${'$'}keys.id}) SET n += ${'$'}properties") @@ -233,11 +239,10 @@ class CudHandlerTest : HandlerTest() { fun `should delete node`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "NODE", "op": "delete", @@ -247,12 +252,14 @@ class CudHandlerTest : HandlerTest() { }, "detach": true } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( "MATCH (n:`Foo`:`Bar` {id: ${'$'}keys.id}) DETACH DELETE n") @@ -264,11 +271,10 @@ class CudHandlerTest : HandlerTest() { fun `should delete node without labels`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "NODE", "op": "delete", @@ -277,12 +283,14 @@ class CudHandlerTest : HandlerTest() { }, "detach": true } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse("MATCH (n {id: ${'$'}keys.id}) DETACH DELETE n").cypher, mapOf("keys" to mapOf("id" to 0)))))) @@ -292,11 +300,10 @@ class CudHandlerTest : HandlerTest() { fun `should delete node without detach`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "NODE", "op": "delete", @@ -305,12 +312,14 @@ class CudHandlerTest : HandlerTest() { "id": 0 } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse("MATCH (n:`Foo`:`Bar` {id: ${'$'}keys.id}) DELETE n") .cypher, @@ -321,11 +330,10 @@ class CudHandlerTest : HandlerTest() { fun `should create relationship`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "relationship", "op": "create", @@ -346,12 +354,14 @@ class CudHandlerTest : HandlerTest() { "by": "incident" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -371,11 +381,10 @@ class CudHandlerTest : HandlerTest() { fun `should create relationship by merging nodes`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "relationship", "op": "create", @@ -398,12 +407,14 @@ class CudHandlerTest : HandlerTest() { "by": "incident" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -423,11 +434,10 @@ class CudHandlerTest : HandlerTest() { fun `should update relationship`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "relationship", "op": "update", @@ -449,12 +459,14 @@ class CudHandlerTest : HandlerTest() { "by": "incident" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -475,11 +487,10 @@ class CudHandlerTest : HandlerTest() { fun `should update relationship with ids`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "relationship", "op": "update", @@ -504,12 +515,14 @@ class CudHandlerTest : HandlerTest() { "by": "incident" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -530,11 +543,10 @@ class CudHandlerTest : HandlerTest() { fun `should merge relationship`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "relationship", "op": "merge", @@ -556,12 +568,14 @@ class CudHandlerTest : HandlerTest() { "by": "incident" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -582,11 +596,10 @@ class CudHandlerTest : HandlerTest() { fun `should merge relationship with ids`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "relationship", "op": "MERGE", @@ -611,12 +624,14 @@ class CudHandlerTest : HandlerTest() { "by": "incident" } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -637,11 +652,10 @@ class CudHandlerTest : HandlerTest() { fun `should delete relationship`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "relationship", "op": "delete", @@ -659,12 +673,14 @@ class CudHandlerTest : HandlerTest() { } } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -684,11 +700,10 @@ class CudHandlerTest : HandlerTest() { fun `should delete relationship with ids`() { val handler = CudHandler("my-topic", Renderer.getDefaultRenderer(), 100) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - """ + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + """ { "type": "relationship", "op": "DELETE", @@ -709,12 +724,14 @@ class CudHandlerTest : HandlerTest() { "id": 5 } } - """))) shouldBe + """) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -783,6 +800,7 @@ class CudHandlerTest : HandlerTest() { ChangeQuery( null, null, + listOf(messages[it]), when (it % modulo) { 0 -> Query( @@ -869,6 +887,7 @@ class CudHandlerTest : HandlerTest() { ChangeQuery( null, null, + listOf(messages[it]), when (it % modulo) { 0 -> Query( diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CypherHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CypherHandlerTest.kt index 27e584ce..6ea76e68 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CypherHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CypherHandlerTest.kt @@ -45,12 +45,14 @@ class CypherHandlerTest : HandlerTest() { "", true) - handler.handle(listOf(newMessage(Schema.STRING_SCHEMA, "{}"))) shouldBe + val sinkMessage = newMessage(Schema.STRING_SCHEMA, "{}") + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.value AS event CALL {WITH * CREATE (n:Node) SET n = event}", mapOf( @@ -79,12 +81,14 @@ class CypherHandlerTest : HandlerTest() { "__value", true) - handler.handle(listOf(newMessage(Schema.STRING_SCHEMA, "{}"))) shouldBe + val sinkMessage = newMessage(Schema.STRING_SCHEMA, "{}") + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.value AS event, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __value}", mapOf( @@ -113,13 +117,14 @@ class CypherHandlerTest : HandlerTest() { "__value", true) - handler.handle( - listOf(newMessage(Schema.STRING_SCHEMA, "{}", Schema.INT64_SCHEMA, 32L))) shouldBe + val sinkMessage = newMessage(Schema.STRING_SCHEMA, "{}", Schema.INT64_SCHEMA, 32L) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.value AS event, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __key}", mapOf( @@ -148,19 +153,20 @@ class CypherHandlerTest : HandlerTest() { "__value", true) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - "{}", - Schema.INT64_SCHEMA, - 32L, - listOf(ConnectHeader("age", SchemaAndValue(Schema.INT32_SCHEMA, 24)))))) shouldBe + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + "{}", + Schema.INT64_SCHEMA, + 32L, + listOf(ConnectHeader("age", SchemaAndValue(Schema.INT32_SCHEMA, 24)))) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.value AS event, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __header}", mapOf( @@ -189,19 +195,21 @@ class CypherHandlerTest : HandlerTest() { "__value", false) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - "{}", - Schema.INT64_SCHEMA, - 32L, - listOf(ConnectHeader("age", SchemaAndValue(Schema.INT32_SCHEMA, 24)))))) shouldBe + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + "{}", + Schema.INT64_SCHEMA, + 32L, + listOf(ConnectHeader("age", SchemaAndValue(Schema.INT32_SCHEMA, 24)))) + + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __header}", mapOf( @@ -230,19 +238,20 @@ class CypherHandlerTest : HandlerTest() { "__value", false) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - "{}", - Schema.INT64_SCHEMA, - 32L, - listOf(ConnectHeader("age", SchemaAndValue(Schema.INT32_SCHEMA, 24)))))) shouldBe + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + "{}", + Schema.INT64_SCHEMA, + 32L, + listOf(ConnectHeader("age", SchemaAndValue(Schema.INT32_SCHEMA, 24)))) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __header}", mapOf( @@ -288,7 +297,8 @@ class CypherHandlerTest : HandlerTest() { "__value", false) - val result = handler.handle((1..13).map { newMessage(Schema.INT64_SCHEMA, it.toLong()) }) + val messages = (1..13).map { newMessage(Schema.INT64_SCHEMA, it.toLong()) } + val result = handler.handle(messages) result shouldBe listOf( @@ -296,6 +306,7 @@ class CypherHandlerTest : HandlerTest() { ChangeQuery( null, null, + messages.slice(0..4), Query( "UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n.id = __value}", mapOf( @@ -312,6 +323,7 @@ class CypherHandlerTest : HandlerTest() { ChangeQuery( null, null, + messages.slice(5..9), Query( "UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n.id = __value}", mapOf( @@ -328,6 +340,7 @@ class CypherHandlerTest : HandlerTest() { ChangeQuery( null, null, + messages.slice(10..12), Query( "UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n.id = __value}", mapOf( @@ -347,14 +360,14 @@ class CypherHandlerTest : HandlerTest() { val handler = CypherHandler("my-topic", "CREATE (n:Node) SET n = event", Renderer.getDefaultRenderer(), 1) - handler.handle( - listOf( - newMessage(Schema.STRING_SCHEMA, "{\"x\": 123, \"y\": [1,2,3], \"z\": true}"))) shouldBe + val sinkMessage = newMessage(Schema.STRING_SCHEMA, "{\"x\": 123, \"y\": [1,2,3], \"z\": true}") + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event}", mapOf( @@ -378,18 +391,15 @@ class CypherHandlerTest : HandlerTest() { val handler = CypherHandler("my-topic", "CREATE (n:Node) SET n = event", Renderer.getDefaultRenderer(), 1) - handler.handle( - listOf( - newMessage( - null, - null, - Schema.STRING_SCHEMA, - "{\"x\": 123, \"y\": [1,2,3], \"z\": true}"))) shouldBe + val sinkMessage = + newMessage(null, null, Schema.STRING_SCHEMA, "{\"x\": 123, \"y\": [1,2,3], \"z\": true}") + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event}", mapOf( @@ -410,25 +420,25 @@ class CypherHandlerTest : HandlerTest() { val handler = CypherHandler("my-topic", "CREATE (n:Node) SET n = event", Renderer.getDefaultRenderer(), 1) - handler.handle( - listOf( - newMessage( - null, - null, - Schema.INT64_SCHEMA, - 32L, - listOf( - ConnectHeader("number", SchemaAndValue(Schema.INT32_SCHEMA, 24)), - ConnectHeader( - "test", - SchemaAndValue( - Schema.STRING_SCHEMA, - "{\"x\": 123, \"y\": [1,2,3], \"z\": true}")))))) shouldBe + val sinkMessage = + newMessage( + null, + null, + Schema.INT64_SCHEMA, + 32L, + listOf( + ConnectHeader("number", SchemaAndValue(Schema.INT32_SCHEMA, 24)), + ConnectHeader( + "test", + SchemaAndValue( + Schema.STRING_SCHEMA, "{\"x\": 123, \"y\": [1,2,3], \"z\": true}")))) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event}", mapOf( @@ -455,19 +465,20 @@ class CypherHandlerTest : HandlerTest() { val handler = CypherHandler("my-topic", "CREATE (n:Node) SET n = event", Renderer.getDefaultRenderer(), 1) - handler.handle( - listOf( - newMessage( - Schema.STRING_SCHEMA, - "{]", - Schema.BYTES_SCHEMA, - "{a: b}".toByteArray(), - listOf(ConnectHeader("test", SchemaAndValue(Schema.STRING_SCHEMA, "b")))))) shouldBe + val sinkMessage = + newMessage( + Schema.STRING_SCHEMA, + "{]", + Schema.BYTES_SCHEMA, + "{a: b}".toByteArray(), + listOf(ConnectHeader("test", SchemaAndValue(Schema.STRING_SCHEMA, "b")))) + handler.handle(listOf(sinkMessage)) shouldBe listOf( listOf( ChangeQuery( null, null, + listOf(sinkMessage), Query( "UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event}", mapOf( diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandlerTest.kt index 4d644955..5cb776e3 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandlerTest.kt @@ -17,6 +17,7 @@ package org.neo4j.connectors.kafka.sink.strategy import io.kotest.matchers.shouldBe +import io.kotest.matchers.throwable.shouldHaveMessage import java.time.Instant import java.time.LocalDate import java.time.ZoneOffset @@ -24,8 +25,10 @@ import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import org.neo4j.connectors.kafka.data.DynamicTypes import org.neo4j.connectors.kafka.data.SimpleTypes +import org.neo4j.connectors.kafka.exceptions.InvalidDataException import org.neo4j.connectors.kafka.sink.ChangeQuery import org.neo4j.cypherdsl.core.renderer.Renderer import org.neo4j.cypherdsl.parser.CypherParser @@ -633,6 +636,42 @@ class NodePatternHandlerTest : HandlerTest() { )))) } + @Test + fun `should fail when the key is not located in the message`() { + assertThrowsHandler( + pattern = "(:Person{!id, !secondary_id, name, surname})", + key = """{}""", + value = """{"name": "John", "surname": "Doe"}""", + message = "Key 'id' could not be located in the message.") + } + + @Test + fun `should fail when explicit key is not located in the keys`() { + assertThrowsHandler( + pattern = "(:Person{!id: __key.old_id, name, surname})", + key = """{}""", + value = """{"name": "John", "surname": "Doe"}""", + message = "Key 'old_id' could not be located in the keys.") + } + + @Test + fun `should fail when explicit key is not located in the values`() { + assertThrowsHandler( + pattern = "(:Person{!id: __value.old_id, name, surname})", + key = """{}""", + value = """{"name": "John", "surname": "Doe"}""", + message = "Key 'old_id' could not be located in the values.") + } + + @Test + fun `should fail when the key is not located in the keys with composite key pattern`() { + assertThrowsHandler( + pattern = "(:Person{!id, !second_id, name, surname})", + key = """{"id": 1}""", + value = """{"name": "John", "surname": "Doe"}""", + message = "Key 'second_id' could not be located in the message.") + } + private fun assertQueryAndParameters( pattern: String, keySchema: Schema = Schema.STRING_SCHEMA, @@ -642,9 +681,10 @@ class NodePatternHandlerTest : HandlerTest() { expected: List> = emptyList() ) { val handler = NodePatternHandler("my-topic", pattern, false, Renderer.getDefaultRenderer(), 1) + val sinkMessage = newMessage(valueSchema, value, keySchema = keySchema, key = key) handler.handle( listOf( - newMessage(valueSchema, value, keySchema = keySchema, key = key), + sinkMessage, ), ) shouldBe listOf( @@ -652,6 +692,7 @@ class NodePatternHandlerTest : HandlerTest() { ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -685,4 +726,22 @@ class NodePatternHandlerTest : HandlerTest() { ), ) } + + private inline fun assertThrowsHandler( + pattern: String, + keySchema: Schema = Schema.STRING_SCHEMA, + key: Any? = null, + valueSchema: Schema = Schema.STRING_SCHEMA, + value: Any? = null, + message: String? = null + ) { + val handler = NodePatternHandler("my-topic", pattern, false, Renderer.getDefaultRenderer(), 1) + val sinkMessage = newMessage(valueSchema, value, keySchema = keySchema, key = key) + + if (message != null) { + assertThrows { handler.handle(listOf(sinkMessage)) } shouldHaveMessage message + } else { + assertThrows { handler.handle(listOf(sinkMessage)) } + } + } } diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandlerTest.kt index ffd04359..20df2c16 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandlerTest.kt @@ -17,12 +17,15 @@ package org.neo4j.connectors.kafka.sink.strategy import io.kotest.matchers.shouldBe +import io.kotest.matchers.throwable.shouldHaveMessage import java.time.Instant import java.time.ZoneOffset import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.neo4j.connectors.kafka.exceptions.InvalidDataException import org.neo4j.connectors.kafka.sink.ChangeQuery import org.neo4j.cypherdsl.core.renderer.Renderer import org.neo4j.cypherdsl.parser.CypherParser @@ -825,6 +828,78 @@ class RelationshipPatternHandlerTest : HandlerTest() { "keys" to mapOf("id" to 3))))) } + @Test + fun `should fail when the source node key is not located in the message`() { + assertThrowsHandler( + pattern = "(:LabelA{!id: start})-[:REL_TYPE{!id: rel_id}]->(:LabelB{!id: end})", + key = """{"rel_id": 1, "end": 1}""", + message = "Key 'start' could not be located in the message.") + } + + @Test + fun `should fail when the relationship key is not located in the message`() { + assertThrowsHandler( + pattern = "(:LabelA{!id: start})-[:REL_TYPE{!id: rel_id}]->(:LabelB{!id: end})", + key = """{"start": 1, "end": 1}""", + message = "Key 'rel_id' could not be located in the message.") + } + + @Test + fun `should fail when the target node key is not located in the message`() { + assertThrowsHandler( + pattern = "(:LabelA{!id: start})-[:REL_TYPE{!id: rel_id}]->(:LabelB{!id: end})", + key = """{"start": 1, "rel_id": 1}""", + message = "Key 'end' could not be located in the message.") + } + + @Test + fun `should fail when the explicit source node key is not located in the keys`() { + assertThrowsHandler( + pattern = "(:LabelA{!id: __key.start})-[:REL_TYPE{!id: rel_id}]->(:LabelB{!id: end})", + key = """{"rel_id": 1, "end": 1}""", + message = "Key 'start' could not be located in the keys.") + } + + @Test + fun `should fail when the explicit relationship key is not located in the keys`() { + assertThrowsHandler( + pattern = "(:LabelA{!id: __key.start})-[:REL_TYPE{!id: __key.rel_id}]->(:LabelB{!id: end})", + key = """{"start": 1, "end": 1}""", + message = "Key 'rel_id' could not be located in the keys.") + } + + @Test + fun `should fail when the explicit target node key is not located in the keys`() { + assertThrowsHandler( + pattern = "(:LabelA{!id: __key.start})-[:REL_TYPE{!id: rel_id}]->(:LabelB{!id: __key.end})", + key = """{"start": 1, "rel_id": 1}""", + message = "Key 'end' could not be located in the keys.") + } + + @Test + fun `should fail when the explicit source node key is not located in the values`() { + assertThrowsHandler( + pattern = "(:LabelA{!id: __value.start})-[:REL_TYPE{!id: rel_id}]->(:LabelB{!id: end})", + value = """{"rel_id": 1, "end": 1}""", + message = "Key 'start' could not be located in the values.") + } + + @Test + fun `should fail when the explicit relationship key is not located in the values`() { + assertThrowsHandler( + pattern = "(:LabelA{!id: start})-[:REL_TYPE{!id: __value.rel_id}]->(:LabelB{!id: end})", + value = """{"start": 1, "end": 1}""", + message = "Key 'rel_id' could not be located in the values.") + } + + @Test + fun `should fail when the explicit target node key is not located in the values`() { + assertThrowsHandler( + pattern = "(:LabelA{!id: start})-[:REL_TYPE{!id: rel_id}]->(:LabelB{!id: __value.end})", + value = """{"start": 1, "rel_id": 1}""", + message = "Key 'end' could not be located in the values.") + } + private fun assertQueryAndParameters( pattern: String, keySchema: Schema = Schema.STRING_SCHEMA, @@ -843,9 +918,10 @@ class RelationshipPatternHandlerTest : HandlerTest() { mergeRelationshipProperties = mergeRelationshipProperties, renderer = Renderer.getDefaultRenderer(), batchSize = 1) + val sinkMessage = newMessage(valueSchema, value, keySchema = keySchema, key = key) handler.handle( listOf( - newMessage(valueSchema, value, keySchema = keySchema, key = key), + sinkMessage, ), ) shouldBe listOf( @@ -853,6 +929,7 @@ class RelationshipPatternHandlerTest : HandlerTest() { ChangeQuery( null, null, + listOf(sinkMessage), Query( CypherParser.parse( """ @@ -891,4 +968,29 @@ class RelationshipPatternHandlerTest : HandlerTest() { ), ) } + + private inline fun assertThrowsHandler( + pattern: String, + keySchema: Schema = Schema.STRING_SCHEMA, + key: Any? = null, + valueSchema: Schema = Schema.STRING_SCHEMA, + value: Any? = null, + message: String? = null + ) { + val handler = + RelationshipPatternHandler( + "my-topic", + pattern, + mergeNodeProperties = false, + mergeRelationshipProperties = false, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + val sinkMessage = newMessage(valueSchema, value, keySchema = keySchema, key = key) + + if (message != null) { + assertThrows { handler.handle(listOf(sinkMessage)) } shouldHaveMessage message + } else { + assertThrows { handler.handle(listOf(sinkMessage)) } + } + } } diff --git a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ConsumerResolver.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ConsumerResolver.kt new file mode 100644 index 00000000..01e69d3a --- /dev/null +++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ConsumerResolver.kt @@ -0,0 +1,91 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.testing.kafka + +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig +import java.util.* +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.junit.jupiter.api.extension.ExtensionContext +import org.junit.jupiter.api.extension.ParameterContext +import org.neo4j.connectors.kafka.testing.format.KeyValueConverterResolver +import org.neo4j.connectors.kafka.testing.source.TopicConsumer + +internal class ConsumerResolver( + private val keyValueConverterResolver: KeyValueConverterResolver, + private val topicRegistry: TopicRegistry, + private val brokerExternalHostProvider: () -> String, + private val schemaControlRegistryExternalUriProvider: () -> String, + private val consumerFactory: (Properties, String) -> KafkaConsumer<*, *> +) { + + fun resolveGenericConsumer( + parameterContext: ParameterContext?, + context: ExtensionContext? + ): ConvertingKafkaConsumer { + val kafkaConsumer = resolveConsumer(parameterContext, context) + return ConvertingKafkaConsumer( + keyConverter = keyValueConverterResolver.resolveKeyConverter(context), + valueConverter = keyValueConverterResolver.resolveValueConverter(context), + kafkaConsumer = kafkaConsumer) + } + + private fun resolveConsumer( + parameterContext: ParameterContext?, + extensionContext: ExtensionContext? + ): KafkaConsumer<*, *> { + val consumerAnnotation = parameterContext?.parameter?.getAnnotation(TopicConsumer::class.java)!! + val topic = topicRegistry.resolveTopic(consumerAnnotation.topic) + val properties = Properties() + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + brokerExternalHostProvider(), + ) + val keyConverter = keyValueConverterResolver.resolveKeyConverter(extensionContext) + val valueConverter = keyValueConverterResolver.resolveValueConverter(extensionContext) + if (keyConverter.supportsSchemaRegistry || valueConverter.supportsSchemaRegistry) { + properties.setProperty( + KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, + schemaControlRegistryExternalUriProvider(), + ) + } + properties.setProperty( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + keyConverter.deserializerClass.name, + ) + properties.setProperty( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + valueConverter.deserializerClass.name, + ) + properties.setProperty( + ConsumerConfig.GROUP_ID_CONFIG, + // note: ExtensionContext#getUniqueId() returns null in the CLI + "${topic}@${extensionContext?.testClass ?: ""}#${extensionContext?.displayName}", + ) + + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAnnotation.offset) + return consumerFactory(properties, topic) + } + + companion object { + internal fun getSubscribedConsumer(properties: Properties, topic: String): KafkaConsumer<*, *> { + val consumer = KafkaConsumer(properties) + consumer.subscribe(listOf(topic)) + return consumer + } + } +} diff --git a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ConvertingKafkaProducer.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ConvertingKafkaProducer.kt index f6cde967..99e16080 100644 --- a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ConvertingKafkaProducer.kt +++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ConvertingKafkaProducer.kt @@ -128,6 +128,24 @@ data class ConvertingKafkaProducer( headers = Headers.from(event).associate { it.key() to it.value() }) } + fun publish(vararg events: ChangeEvent) { + val kafkaMessages = mutableListOf() + + events.forEach { + val connectValue = it.toConnectValue() + kafkaMessages.add( + KafkaMessage( + keySchema = connectValue.schema(), + key = connectValue.value(), + valueSchema = connectValue.schema(), + value = connectValue.value(), + timestamp = it.metadata.txCommitTime.toInstant(), + headers = Headers.from(it).associate { header -> header.key() to header.value() })) + } + + publish(*kafkaMessages.toTypedArray()) + } + fun publish(event: StreamsTransactionEvent) { publish(valueSchema = Schema.STRING_SCHEMA, value = JSONUtils.writeValueAsString(event)) } diff --git a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ProducerResolver.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ProducerResolver.kt new file mode 100644 index 00000000..5f17a85b --- /dev/null +++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/kafka/ProducerResolver.kt @@ -0,0 +1,85 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.testing.kafka + +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig +import java.net.URI +import java.util.* +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig +import org.junit.jupiter.api.extension.ExtensionContext +import org.junit.jupiter.api.extension.ParameterContext +import org.neo4j.connectors.kafka.testing.format.KeyValueConverterResolver +import org.neo4j.connectors.kafka.testing.sink.SchemaCompatibilityMode +import org.neo4j.connectors.kafka.testing.sink.TopicProducer + +class ProducerResolver( + private val keyValueConverterResolver: KeyValueConverterResolver, + private val topicRegistry: TopicRegistry, + private val brokerExternalHostProvider: () -> String, + private val schemaControlRegistryExternalUriProvider: () -> String, + private val schemaControlKeyCompatibilityProvider: () -> SchemaCompatibilityMode, + private val schemaControlValueCompatibilityProvider: () -> SchemaCompatibilityMode +) { + + fun resolveGenericProducer( + parameterContext: ParameterContext?, + extensionContext: ExtensionContext? + ): Any { + val producerAnnotation = parameterContext?.parameter?.getAnnotation(TopicProducer::class.java)!! + return ConvertingKafkaProducer( + schemaRegistryURI = URI(schemaControlRegistryExternalUriProvider()), + keyConverter = keyValueConverterResolver.resolveKeyConverter(extensionContext), + keyCompatibilityMode = schemaControlKeyCompatibilityProvider(), + valueConverter = keyValueConverterResolver.resolveValueConverter(extensionContext), + valueCompatibilityMode = schemaControlValueCompatibilityProvider(), + kafkaProducer = resolveProducer(parameterContext, extensionContext), + topic = topicRegistry.resolveTopic(producerAnnotation.topic)) + } + + private fun resolveProducer( + @Suppress("UNUSED_PARAMETER") parameterContext: ParameterContext?, + extensionContext: ExtensionContext? + ): KafkaProducer { + val properties = Properties() + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + brokerExternalHostProvider(), + ) + val keyConverter = keyValueConverterResolver.resolveKeyConverter(extensionContext) + val valueConverter = keyValueConverterResolver.resolveValueConverter(extensionContext) + if (keyConverter.supportsSchemaRegistry || valueConverter.supportsSchemaRegistry) { + properties.setProperty( + KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, + schemaControlRegistryExternalUriProvider(), + ) + } + properties.setProperty( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + keyConverter.serializerClass.name, + ) + properties.setProperty( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + valueConverter.serializerClass.name, + ) + properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()) + val producer = KafkaProducer(properties) + producer.initTransactions() + return producer + } +} diff --git a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSink.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSink.kt index 03f83f1d..bbcc0017 100644 --- a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSink.kt +++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSink.kt @@ -41,6 +41,9 @@ annotation class Neo4jSink( val nodePattern: Array = [], val relationshipPattern: Array = [], val cud: Array = [], + val errorTolerance: String = "all", + val errorDlqTopic: String = "", + val enableErrorHeaders: Boolean = false, ) enum class SchemaCompatibilityMode { diff --git a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkExtension.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkExtension.kt index 30046770..ea9ac1a9 100644 --- a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkExtension.kt +++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkExtension.kt @@ -16,13 +16,8 @@ */ package org.neo4j.connectors.kafka.testing.sink -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig -import java.net.URI import java.util.* import kotlin.jvm.optionals.getOrNull -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig import org.junit.jupiter.api.extension.AfterEachCallback import org.junit.jupiter.api.extension.BeforeEachCallback import org.junit.jupiter.api.extension.ConditionEvaluationResult @@ -37,7 +32,10 @@ import org.neo4j.connectors.kafka.testing.DatabaseSupport.createDatabase import org.neo4j.connectors.kafka.testing.DatabaseSupport.dropDatabase import org.neo4j.connectors.kafka.testing.ParameterResolvers import org.neo4j.connectors.kafka.testing.format.KeyValueConverterResolver +import org.neo4j.connectors.kafka.testing.kafka.ConsumerResolver +import org.neo4j.connectors.kafka.testing.kafka.ConvertingKafkaConsumer import org.neo4j.connectors.kafka.testing.kafka.ConvertingKafkaProducer +import org.neo4j.connectors.kafka.testing.kafka.ProducerResolver import org.neo4j.connectors.kafka.testing.kafka.TopicRegistry import org.neo4j.driver.AuthToken import org.neo4j.driver.AuthTokens @@ -61,6 +59,7 @@ internal class Neo4jSinkExtension( mapOf( Session::class.java to ::resolveSession, ConvertingKafkaProducer::class.java to ::resolveGenericProducer, + ConvertingKafkaConsumer::class.java to ::resolveGenericConsumer, )) private lateinit var sinkAnnotation: Neo4jSink @@ -93,6 +92,10 @@ internal class Neo4jSinkExtension( private val neo4jPassword = AnnotationValueResolver(Neo4jSink::neo4jPassword, envAccessor) + private val errorTolerance = AnnotationValueResolver(Neo4jSink::errorTolerance, envAccessor) + + private val errorDlqTopic = AnnotationValueResolver(Neo4jSink::errorDlqTopic, envAccessor) + private val mandatorySettings = listOf( kafkaConnectExternalUri, @@ -106,6 +109,29 @@ internal class Neo4jSinkExtension( private val keyValueConverterResolver = KeyValueConverterResolver() + private val producerResolver = + ProducerResolver( + keyValueConverterResolver, + topicRegistry, + brokerExternalHostProvider = { brokerExternalHost.resolve(sinkAnnotation) }, + schemaControlRegistryExternalUriProvider = { + schemaControlRegistryExternalUri.resolve(sinkAnnotation) + }, + schemaControlKeyCompatibilityProvider = { sinkAnnotation.schemaControlKeyCompatibility }, + schemaControlValueCompatibilityProvider = { + sinkAnnotation.schemaControlValueCompatibility + }) + + private val consumerResolver = + ConsumerResolver( + keyValueConverterResolver, + topicRegistry, + brokerExternalHostProvider = { brokerExternalHost.resolve(sinkAnnotation) }, + schemaControlRegistryExternalUriProvider = { + schemaControlRegistryExternalUri.resolve(sinkAnnotation) + }, + consumerFactory = ConsumerResolver::getSubscribedConsumer) + override fun evaluateExecutionCondition(context: ExtensionContext?): ConditionEvaluationResult { val metadata = AnnotationSupport.findAnnotation(context) @@ -219,7 +245,10 @@ internal class Neo4jSinkExtension( keyConverter = keyValueConverterResolver.resolveKeyConverter(context), valueConverter = keyValueConverterResolver.resolveValueConverter(context), topics = topics.distinct(), - strategies = strategies) + strategies = strategies, + errorTolerance = errorTolerance.resolve(sinkAnnotation), + errorDlqTopic = topicRegistry.resolveTopic(errorDlqTopic.resolve(sinkAnnotation)), + enableErrorHeaders = sinkAnnotation.enableErrorHeaders) sink.register(kafkaConnectExternalUri.resolve(sinkAnnotation)) topicRegistry.log() } @@ -294,49 +323,17 @@ internal class Neo4jSinkExtension( } } - private fun resolveProducer( - @Suppress("UNUSED_PARAMETER") parameterContext: ParameterContext?, + private fun resolveGenericProducer( + parameterContext: ParameterContext?, extensionContext: ExtensionContext? - ): KafkaProducer { - val properties = Properties() - properties.setProperty( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - brokerExternalHost.resolve(sinkAnnotation), - ) - val keyConverter = keyValueConverterResolver.resolveKeyConverter(extensionContext) - val valueConverter = keyValueConverterResolver.resolveValueConverter(extensionContext) - if (keyConverter.supportsSchemaRegistry || valueConverter.supportsSchemaRegistry) { - properties.setProperty( - KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, - schemaControlRegistryExternalUri.resolve(sinkAnnotation), - ) - } - properties.setProperty( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - keyConverter.serializerClass.name, - ) - properties.setProperty( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - valueConverter.serializerClass.name, - ) - properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()) - val producer = KafkaProducer(properties) - producer.initTransactions() - return producer + ): Any { + return producerResolver.resolveGenericProducer(parameterContext, extensionContext) } - private fun resolveGenericProducer( + private fun resolveGenericConsumer( parameterContext: ParameterContext?, extensionContext: ExtensionContext? ): Any { - val producerAnnotation = parameterContext?.parameter?.getAnnotation(TopicProducer::class.java)!! - return ConvertingKafkaProducer( - schemaRegistryURI = URI(schemaControlRegistryExternalUri.resolve(sinkAnnotation)), - keyConverter = keyValueConverterResolver.resolveKeyConverter(extensionContext), - keyCompatibilityMode = sinkAnnotation.schemaControlKeyCompatibility, - valueConverter = keyValueConverterResolver.resolveValueConverter(extensionContext), - valueCompatibilityMode = sinkAnnotation.schemaControlValueCompatibility, - kafkaProducer = resolveProducer(parameterContext, extensionContext), - topic = topicRegistry.resolveTopic(producerAnnotation.topic)) + return consumerResolver.resolveGenericConsumer(parameterContext, extensionContext) } } diff --git a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistration.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistration.kt index 3fe5bb3b..76211710 100644 --- a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistration.kt +++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistration.kt @@ -31,6 +31,8 @@ internal class Neo4jSinkRegistration( retryTimeout: Duration = (-1).milliseconds, retryMaxDelay: Duration = 1000.milliseconds, errorTolerance: String = "all", + errorDlqTopic: String = "", + enableErrorHeaders: Boolean = false, enableErrorLog: Boolean = true, includeMessagesInErrorLog: Boolean = true, schemaControlRegistryUri: String, @@ -43,6 +45,10 @@ internal class Neo4jSinkRegistration( private val name: String = randomizedName("Neo4jSinkConnector") private val payload: Map + companion object { + private const val DLQ_TOPIC_REPLICATION_FACTOR = 1 + } + init { payload = mutableMapOf( @@ -56,6 +62,13 @@ internal class Neo4jSinkRegistration( put("errors.retry.timeout", retryTimeout.inWholeMilliseconds) put("errors.retry.delay.max.ms", retryMaxDelay.inWholeMilliseconds) put("errors.tolerance", errorTolerance) + if (errorDlqTopic.trim().isNotEmpty()) { + put("errors.deadletterqueue.topic.name", errorDlqTopic) + put( + "errors.deadletterqueue.topic.replication.factor", + DLQ_TOPIC_REPLICATION_FACTOR) + } + put("errors.deadletterqueue.context.headers.enable", enableErrorHeaders) put("errors.log.enable", enableErrorLog) put("errors.log.include.messages", includeMessagesInErrorLog) put("neo4j.uri", neo4jUri) diff --git a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtension.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtension.kt index 00b188af..16a35d90 100644 --- a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtension.kt +++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtension.kt @@ -16,11 +16,9 @@ */ package org.neo4j.connectors.kafka.testing.source -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig import java.util.* import kotlin.jvm.optionals.getOrNull import kotlin.reflect.KProperty1 -import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.KafkaConsumer import org.junit.jupiter.api.extension.AfterEachCallback import org.junit.jupiter.api.extension.BeforeEachCallback @@ -37,6 +35,7 @@ import org.neo4j.connectors.kafka.testing.DatabaseSupport.dropDatabase import org.neo4j.connectors.kafka.testing.DatabaseSupport.enableCdc import org.neo4j.connectors.kafka.testing.ParameterResolvers import org.neo4j.connectors.kafka.testing.format.KeyValueConverterResolver +import org.neo4j.connectors.kafka.testing.kafka.ConsumerResolver import org.neo4j.connectors.kafka.testing.kafka.ConvertingKafkaConsumer import org.neo4j.connectors.kafka.testing.kafka.TopicRegistry import org.neo4j.driver.AuthToken @@ -52,8 +51,9 @@ internal class Neo4jSourceExtension( // visible for testing envAccessor: (String) -> String? = System::getenv, private val driverFactory: (String, AuthToken) -> Driver = GraphDatabase::driver, - private val consumerFactory: (Properties, String) -> KafkaConsumer<*, *> = - ::getSubscribedConsumer, + consumerFactory: (Properties, String) -> KafkaConsumer<*, *> = { properties, topic -> + ConsumerResolver.getSubscribedConsumer(properties, topic) + }, ) : ExecutionCondition, BeforeEachCallback, AfterEachCallback, ParameterResolver { private val log: Logger = LoggerFactory.getLogger(Neo4jSourceExtension::class.java) @@ -109,6 +109,16 @@ internal class Neo4jSourceExtension( private val keyValueConverterResolver = KeyValueConverterResolver() + private val consumerResolver = + ConsumerResolver( + keyValueConverterResolver, + topicRegistry, + brokerExternalHostProvider = { brokerExternalHost.resolve(sourceAnnotation) }, + schemaControlRegistryExternalUriProvider = { + schemaControlRegistryExternalUri.resolve(sourceAnnotation) + }, + consumerFactory) + override fun evaluateExecutionCondition(context: ExtensionContext?): ConditionEvaluationResult { val metadata = AnnotationSupport.findAnnotation(context) @@ -171,43 +181,6 @@ internal class Neo4jSourceExtension( } } - private fun resolveConsumer( - parameterContext: ParameterContext?, - extensionContext: ExtensionContext? - ): KafkaConsumer<*, *> { - val consumerAnnotation = parameterContext?.parameter?.getAnnotation(TopicConsumer::class.java)!! - val topic = topicRegistry.resolveTopic(consumerAnnotation.topic) - val properties = Properties() - properties.setProperty( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - brokerExternalHost.resolve(sourceAnnotation), - ) - val keyConverter = keyValueConverterResolver.resolveKeyConverter(extensionContext) - val valueConverter = keyValueConverterResolver.resolveValueConverter(extensionContext) - if (keyConverter.supportsSchemaRegistry || valueConverter.supportsSchemaRegistry) { - properties.setProperty( - KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, - schemaControlRegistryExternalUri.resolve(sourceAnnotation), - ) - } - properties.setProperty( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - keyConverter.deserializerClass.name, - ) - properties.setProperty( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - valueConverter.deserializerClass.name, - ) - properties.setProperty( - ConsumerConfig.GROUP_ID_CONFIG, - // note: ExtensionContext#getUniqueId() returns null in the CLI - "${topic}@${extensionContext?.testClass ?: ""}#${extensionContext?.displayName}", - ) - - properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAnnotation.offset) - return consumerFactory(properties, topic) - } - private fun resolveSession( @Suppress("UNUSED_PARAMETER") parameterContext: ParameterContext?, extensionContext: ExtensionContext? @@ -250,11 +223,7 @@ internal class Neo4jSourceExtension( parameterContext: ParameterContext?, context: ExtensionContext? ): ConvertingKafkaConsumer { - val kafkaConsumer = resolveConsumer(parameterContext, context) - return ConvertingKafkaConsumer( - keyConverter = keyValueConverterResolver.resolveKeyConverter(context), - valueConverter = keyValueConverterResolver.resolveValueConverter(context), - kafkaConsumer = kafkaConsumer) + return consumerResolver.resolveGenericConsumer(parameterContext, context) } private fun CdcSource.paramAsMap( @@ -293,14 +262,6 @@ internal class Neo4jSourceExtension( .mapValues { entry -> entry.value.map { it.keySerialization }.single() } } - companion object { - private fun getSubscribedConsumer(properties: Properties, topic: String): KafkaConsumer<*, *> { - val consumer = KafkaConsumer(properties) - consumer.subscribe(listOf(topic)) - return consumer - } - } - override fun supportsParameter( parameterContext: ParameterContext?, extensionContext: ExtensionContext? diff --git a/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistrationTest.kt b/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistrationTest.kt index 991ab4ea..f086126d 100644 --- a/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistrationTest.kt +++ b/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistrationTest.kt @@ -36,6 +36,7 @@ class Neo4jSinkRegistrationTest { "errors.retry.timeout" to -1L, "errors.retry.delay.max.ms" to 1000L, "errors.tolerance" to "all", + "errors.deadletterqueue.context.headers.enable" to false, "errors.log.enable" to true, "errors.log.include.messages" to true, "neo4j.uri" to "neo4j://example.com", @@ -62,6 +63,49 @@ class Neo4jSinkRegistrationTest { assertEquals(expectedConfig, payload["config"]) } + @Test + fun `creates payload with dlq topic`() { + val expectedConfig = + mapOf( + "topics" to "my-topic", + "connector.class" to "org.neo4j.connectors.kafka.sink.Neo4jConnector", + "key.converter" to "io.confluent.connect.avro.AvroConverter", + "key.converter.schema.registry.url" to "http://example.com", + "value.converter" to "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url" to "http://example.com", + "errors.retry.timeout" to -1L, + "errors.retry.delay.max.ms" to 1000L, + "errors.tolerance" to "all", + "errors.deadletterqueue.topic.name" to "dlq-topic", + "errors.deadletterqueue.topic.replication.factor" to 1, + "errors.deadletterqueue.context.headers.enable" to false, + "errors.log.enable" to true, + "errors.log.include.messages" to true, + "neo4j.uri" to "neo4j://example.com", + "neo4j.database" to "database", + "neo4j.authentication.type" to "BASIC", + "neo4j.authentication.basic.username" to "user", + "neo4j.authentication.basic.password" to "password", + "neo4j.cypher.topic.my-topic" to "MERGE ()") + val registration = + Neo4jSinkRegistration( + neo4jUri = "neo4j://example.com", + neo4jUser = "user", + neo4jPassword = "password", + neo4jDatabase = "database", + schemaControlRegistryUri = "http://example.com", + keyConverter = KafkaConverter.AVRO, + valueConverter = KafkaConverter.AVRO, + errorDlqTopic = "dlq-topic", + topics = listOf("my-topic"), + strategies = mapOf("neo4j.cypher.topic.my-topic" to "MERGE ()")) + + val payload = registration.getPayload() + + assertTrue((payload["name"] as String).startsWith("Neo4jSinkConnector")) + assertEquals(expectedConfig, payload["config"]) + } + @Test fun `creates payload with multiple topics`() { val registration =