From 82e555e6969580e7de081046429efc585a50a70b Mon Sep 17 00:00:00 2001 From: dhrudevalia Date: Thu, 3 Oct 2024 17:52:21 +0100 Subject: [PATCH 1/4] fix: filter null 'key' values in streams node change events --- .../data/StreamsTransactionEventExtensions.kt | 7 +- .../Neo4jCdcSchemaFromStreamsMessageIT.kt | 204 ++++++++++++++++++ 2 files changed, 210 insertions(+), 1 deletion(-) diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/data/StreamsTransactionEventExtensions.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/data/StreamsTransactionEventExtensions.kt index 45a2b1eaa..09d3287e0 100644 --- a/common/src/main/kotlin/org/neo4j/connectors/kafka/data/StreamsTransactionEventExtensions.kt +++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/data/StreamsTransactionEventExtensions.kt @@ -70,8 +70,13 @@ object StreamsTransactionEventExtensions { this.schema.constraints .filter { c -> c.type == StreamsConstraintType.UNIQUE } .map { c -> - c.label!! to c.properties.associateWith { referenceState.properties[it] } + c.label!! to + c.properties + .associateWith { referenceState.properties[it] } + // Does not have values associated in the schema + .filterValues { it != null } } + .filter { it.second.isNotEmpty() } .groupBy { it.first } .mapValues { it.value.map { p -> p.second } } diff --git a/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt b/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt index 6572c8fee..c31840554 100644 --- a/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt +++ b/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt @@ -24,6 +24,7 @@ import java.time.LocalDate import kotlin.time.Duration.Companion.seconds import org.junit.jupiter.api.Test import org.neo4j.connectors.kafka.events.Constraint +import org.neo4j.connectors.kafka.events.EntityType import org.neo4j.connectors.kafka.events.Meta import org.neo4j.connectors.kafka.events.NodeChange import org.neo4j.connectors.kafka.events.NodePayload @@ -158,6 +159,209 @@ class Neo4jCdcSchemaFromStreamsMessageIT { } } + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) + @Test + fun `should create a node with a null unique constraint property value`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + session: Session + ) = runTest { + + // given a creation event + // with a unique constraint referencing a property that doesn't exist + val event = + StreamsTransactionEvent( + meta = newMetadata(operation = OperationType.created), + payload = + NodePayload( + id = "1", + type = EntityType.node, + before = null, + after = + NodeChange( + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ), + listOf("Person")), + ), + schema = + Schema( + properties = + mapOf( + "first_name" to "String", + "last_name" to "String", + "email" to "String", + ), + constraints = + listOf( + Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE), + Constraint( + "Person", + setOf("email"), + StreamsConstraintType.NODE_PROPERTY_EXISTS), + Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)), + )) + + // when the event is published + producer.publish(event) + + // then a new node should exist + eventually(30.seconds) { + val result = + session + .run( + "MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n", + mapOf("first_name" to "john")) + .list() + + result shouldHaveSize 1 + } + } + + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) + @Test + fun `should delete a node with a null unique constraint property value`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + session: Session + ) = runTest { + + // given a database containing 1 node + session + .run( + "CREATE (n:Person) SET n = ${'$'}props", + mapOf( + "props" to + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ))) + .consume() + + // and a deletion event and with a unique constraint referencing a property that doesn't exist + val event = + StreamsTransactionEvent( + meta = newMetadata(operation = OperationType.deleted), + payload = + NodePayload( + id = "1", + type = EntityType.node, + after = null, + before = + NodeChange( + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ), + listOf("Person")), + ), + schema = + Schema( + properties = + mapOf( + "first_name" to "String", + "last_name" to "String", + "email" to "String", + ), + constraints = + listOf( + Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE), + Constraint( + "Person", + setOf("email"), + StreamsConstraintType.NODE_PROPERTY_EXISTS), + Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)), + )) + + // when the event is published + producer.publish(event) + + // then the node should no longer exist + eventually(10.seconds) { + val result = + session + .run( + "MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n", + mapOf("first_name" to "john")) + .list() + + result shouldHaveSize 0 + } + } + + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) + @Test + fun `should fail to delete a node when no valid unique constraints are provided`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + session: Session + ) = runTest { + + // given a database containing 1 node + session + .run( + "CREATE (n:Person) SET n = ${'$'}props", + mapOf( + "props" to + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ))) + .consume() + + // and a deletion event and with a multiple unique constraints which do not have a valid + // property + val event = + StreamsTransactionEvent( + meta = newMetadata(operation = OperationType.deleted), + payload = + NodePayload( + id = "1", + type = EntityType.node, + after = null, + before = + NodeChange( + mapOf( + "first_name" to "john", + "last_name" to "smith", + ), + listOf("Person")), + ), + schema = + Schema( + properties = + mapOf( + "first_name" to "String", + "last_name" to "String", + ), + constraints = + listOf( + Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE), + Constraint( + "Person", + setOf("email"), + StreamsConstraintType.NODE_PROPERTY_EXISTS), + Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)), + )) + + // when the event is published + producer.publish(event) + + // then the node should not be deleted and should still exist + eventually(10.seconds) { + val result = + session + .run( + "MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n", + mapOf("first_name" to "john")) + .list() + + result shouldHaveSize 1 + } + } + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) @Test fun `should create relationship`( From 9ffdbc56f36a906eaf86dd9d9c37c320f921a0e7 Mon Sep 17 00:00:00 2001 From: dhrudevalia Date: Thu, 3 Oct 2024 17:52:57 +0100 Subject: [PATCH 2/4] fix: validate as least 1 key exists in node transformation --- .../kafka/sink/strategy/CdcSchemaHandler.kt | 10 +- .../sink/strategy/CdcSchemaHandlerTest.kt | 92 +++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt index 477a524c2..8000f4175 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt @@ -139,10 +139,16 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH "schema strategy requires at least one node key associated with node aliased '$named'." } + val validKeys = keys.filterValues { it.isNotEmpty() } + + require(validKeys.isNotEmpty()) { + "schema strategy requires at least one node key with valid properties aliased '$named'." + } + val node = - Cypher.node(keys.keys.first(), keys.keys.drop(1)) + Cypher.node(validKeys.keys.first(), validKeys.keys.drop(1)) .withProperties( - keys + validKeys .flatMap { it.value } .asSequence() .flatMap { it.asSequence() } diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt index cbf077b08..24a6fea3c 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt @@ -727,6 +727,98 @@ class CdcSchemaHandlerTest { } shouldHaveMessage "update operation requires 'after' field in the event object" } + @Test + fun `should generate correct statement for node deletion events containing null keys property values`() { + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.DELETE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("name" to "john"), mapOf("invalid" to null))), + NodeState(emptyList(), mapOf("name" to "john")), + null), + 1, + 0) + verify( + listOf(sinkMessage), + listOf( + listOf( + ChangeQuery( + 1, + 0, + listOf(sinkMessage), + Query( + "MATCH (n:`Person` {name: ${'$'}nName}) DETACH DELETE n", + mapOf("nName" to "john")))))) + } + + @Test + fun `should generate correct statement for node creation events containing null keys property values`() { + val sinkMessage = + newChangeEventMessage( + event = + NodeEvent( + "node-element-id", + EntityOperation.CREATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("name" to "john"), mapOf("invalid" to null))), + null, + NodeState( + listOf("Person"), + mapOf( + "name" to "john", + ))), + txId = 1, + seq = 0) + verify( + listOf(sinkMessage), + listOf( + listOf( + ChangeQuery( + 1, + 0, + listOf(sinkMessage), + Query( + "MERGE (n:`Person` {name: ${'$'}nName}) SET n = ${'$'}nProps", + mapOf( + "nName" to "john", + "nProps" to + mapOf( + "name" to "john", + ))))))) + } + + @Test + fun `should fail to generate query when no keys properties are provided`() { + // given a sink message which contains a key with no properties + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.DELETE, + listOf("Person"), + mapOf("Person" to emptyList()), + NodeState(emptyList(), mapOf("name" to "john")), + null), + 1, + 0) + + // when the key is handled + // then an exception is thrown + shouldThrow { + val handler = + CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) + + handler.handle(listOf(sinkMessage)) + } + .also { + it shouldHaveMessage + Regex( + "^schema strategy requires at least one node key with valid properties aliased.*$") + } + } + private fun verify(messages: Iterable, expected: Iterable>) { val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) From c4c78d205f8993103f75f88aeebc0376c9643eac Mon Sep 17 00:00:00 2001 From: dhrudevalia Date: Fri, 4 Oct 2024 14:02:11 +0100 Subject: [PATCH 3/4] test: add update example of test --- .../Neo4jCdcSchemaFromStreamsMessageIT.kt | 85 ++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt b/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt index c31840554..c178313c1 100644 --- a/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt +++ b/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt @@ -16,6 +16,7 @@ */ package org.neo4j.connectors.kafka.sink +import io.kotest.assertions.nondeterministic.continually import io.kotest.assertions.nondeterministic.eventually import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.should @@ -219,6 +220,88 @@ class Neo4jCdcSchemaFromStreamsMessageIT { } } + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) + @Test + fun `should update node with a null unique constraint property value`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + session: Session + ) = runTest { + + // given a database with a single node + session + .run( + "CREATE (n:Person) SET n = ${'$'}props", + mapOf( + "props" to + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ))) + .consume() + + // and an update event adding a new property and label + // which contains a non-existent constraint + val updateEvent = + StreamsTransactionEvent( + meta = newMetadata(operation = OperationType.updated), + payload = + NodePayload( + id = "Person", + before = + NodeChange( + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ), + listOf("Person")), + after = + NodeChange( + properties = + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + "location" to "London"), + labels = listOf("Person", "Employee"))), + schema = + Schema( + constraints = + listOf( + Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE), + Constraint( + "Person", + setOf("email"), + StreamsConstraintType.NODE_PROPERTY_EXISTS), + Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)), + )) + + // when the message is published + producer.publish(updateEvent) + + // then the node should exist with its additional properties and labels + eventually(30.seconds) { + val result = + session + .run( + "MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n", + mapOf("first_name" to "john")) + .single() + + result.get("n").asNode() should + { + it.labels() shouldBe listOf("Person", "Employee") + it.asMap() shouldBe + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + "location" to "London") + } + } + } + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) @Test fun `should delete a node with a null unique constraint property value`( @@ -350,7 +433,7 @@ class Neo4jCdcSchemaFromStreamsMessageIT { producer.publish(event) // then the node should not be deleted and should still exist - eventually(10.seconds) { + continually(10.seconds) { val result = session .run( From a446171d6e55c1e55df47e8f4a7349700f0e5f4a Mon Sep 17 00:00:00 2001 From: dhrudevalia Date: Mon, 7 Oct 2024 09:04:15 +0100 Subject: [PATCH 4/4] fix: remove redundant check --- .../neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt | 4 ---- .../connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt index 8000f4175..5031c230c 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt @@ -135,10 +135,6 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH } private fun buildNode(keys: Map>>, named: String): Node { - require(keys.isNotEmpty()) { - "schema strategy requires at least one node key associated with node aliased '$named'." - } - val validKeys = keys.filterValues { it.isNotEmpty() } require(validKeys.isNotEmpty()) { diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt index 24a6fea3c..781221aee 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt @@ -107,7 +107,7 @@ class CdcSchemaHandlerTest { .also { it shouldHaveMessage Regex( - "^schema strategy requires at least one node key associated with node aliased.*$") + "^schema strategy requires at least one node key with valid properties aliased.*$") } } }