diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/data/StreamsTransactionEventExtensions.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/data/StreamsTransactionEventExtensions.kt index 45a2b1eaa..09d3287e0 100644 --- a/common/src/main/kotlin/org/neo4j/connectors/kafka/data/StreamsTransactionEventExtensions.kt +++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/data/StreamsTransactionEventExtensions.kt @@ -70,8 +70,13 @@ object StreamsTransactionEventExtensions { this.schema.constraints .filter { c -> c.type == StreamsConstraintType.UNIQUE } .map { c -> - c.label!! to c.properties.associateWith { referenceState.properties[it] } + c.label!! to + c.properties + .associateWith { referenceState.properties[it] } + // Does not have values associated in the schema + .filterValues { it != null } } + .filter { it.second.isNotEmpty() } .groupBy { it.first } .mapValues { it.value.map { p -> p.second } } diff --git a/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt b/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt index 6572c8fee..c178313c1 100644 --- a/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt +++ b/sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt @@ -16,6 +16,7 @@ */ package org.neo4j.connectors.kafka.sink +import io.kotest.assertions.nondeterministic.continually import io.kotest.assertions.nondeterministic.eventually import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.should @@ -24,6 +25,7 @@ import java.time.LocalDate import kotlin.time.Duration.Companion.seconds import org.junit.jupiter.api.Test import org.neo4j.connectors.kafka.events.Constraint +import org.neo4j.connectors.kafka.events.EntityType import org.neo4j.connectors.kafka.events.Meta import org.neo4j.connectors.kafka.events.NodeChange import org.neo4j.connectors.kafka.events.NodePayload @@ -158,6 +160,291 @@ class Neo4jCdcSchemaFromStreamsMessageIT { } } + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) + @Test + fun `should create a node with a null unique constraint property value`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + session: Session + ) = runTest { + + // given a creation event + // with a unique constraint referencing a property that doesn't exist + val event = + StreamsTransactionEvent( + meta = newMetadata(operation = OperationType.created), + payload = + NodePayload( + id = "1", + type = EntityType.node, + before = null, + after = + NodeChange( + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ), + listOf("Person")), + ), + schema = + Schema( + properties = + mapOf( + "first_name" to "String", + "last_name" to "String", + "email" to "String", + ), + constraints = + listOf( + Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE), + Constraint( + "Person", + setOf("email"), + StreamsConstraintType.NODE_PROPERTY_EXISTS), + Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)), + )) + + // when the event is published + producer.publish(event) + + // then a new node should exist + eventually(30.seconds) { + val result = + session + .run( + "MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n", + mapOf("first_name" to "john")) + .list() + + result shouldHaveSize 1 + } + } + + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) + @Test + fun `should update node with a null unique constraint property value`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + session: Session + ) = runTest { + + // given a database with a single node + session + .run( + "CREATE (n:Person) SET n = ${'$'}props", + mapOf( + "props" to + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ))) + .consume() + + // and an update event adding a new property and label + // which contains a non-existent constraint + val updateEvent = + StreamsTransactionEvent( + meta = newMetadata(operation = OperationType.updated), + payload = + NodePayload( + id = "Person", + before = + NodeChange( + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ), + listOf("Person")), + after = + NodeChange( + properties = + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + "location" to "London"), + labels = listOf("Person", "Employee"))), + schema = + Schema( + constraints = + listOf( + Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE), + Constraint( + "Person", + setOf("email"), + StreamsConstraintType.NODE_PROPERTY_EXISTS), + Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)), + )) + + // when the message is published + producer.publish(updateEvent) + + // then the node should exist with its additional properties and labels + eventually(30.seconds) { + val result = + session + .run( + "MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n", + mapOf("first_name" to "john")) + .single() + + result.get("n").asNode() should + { + it.labels() shouldBe listOf("Person", "Employee") + it.asMap() shouldBe + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + "location" to "London") + } + } + } + + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) + @Test + fun `should delete a node with a null unique constraint property value`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + session: Session + ) = runTest { + + // given a database containing 1 node + session + .run( + "CREATE (n:Person) SET n = ${'$'}props", + mapOf( + "props" to + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ))) + .consume() + + // and a deletion event and with a unique constraint referencing a property that doesn't exist + val event = + StreamsTransactionEvent( + meta = newMetadata(operation = OperationType.deleted), + payload = + NodePayload( + id = "1", + type = EntityType.node, + after = null, + before = + NodeChange( + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ), + listOf("Person")), + ), + schema = + Schema( + properties = + mapOf( + "first_name" to "String", + "last_name" to "String", + "email" to "String", + ), + constraints = + listOf( + Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE), + Constraint( + "Person", + setOf("email"), + StreamsConstraintType.NODE_PROPERTY_EXISTS), + Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)), + )) + + // when the event is published + producer.publish(event) + + // then the node should no longer exist + eventually(10.seconds) { + val result = + session + .run( + "MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n", + mapOf("first_name" to "john")) + .list() + + result shouldHaveSize 0 + } + } + + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) + @Test + fun `should fail to delete a node when no valid unique constraints are provided`( + @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, + session: Session + ) = runTest { + + // given a database containing 1 node + session + .run( + "CREATE (n:Person) SET n = ${'$'}props", + mapOf( + "props" to + mapOf( + "first_name" to "john", + "last_name" to "smith", + "email" to "john@smith.org", + ))) + .consume() + + // and a deletion event and with a multiple unique constraints which do not have a valid + // property + val event = + StreamsTransactionEvent( + meta = newMetadata(operation = OperationType.deleted), + payload = + NodePayload( + id = "1", + type = EntityType.node, + after = null, + before = + NodeChange( + mapOf( + "first_name" to "john", + "last_name" to "smith", + ), + listOf("Person")), + ), + schema = + Schema( + properties = + mapOf( + "first_name" to "String", + "last_name" to "String", + ), + constraints = + listOf( + Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE), + Constraint( + "Person", + setOf("email"), + StreamsConstraintType.NODE_PROPERTY_EXISTS), + Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)), + )) + + // when the event is published + producer.publish(event) + + // then the node should not be deleted and should still exist + continually(10.seconds) { + val result = + session + .run( + "MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n", + mapOf("first_name" to "john")) + .list() + + result shouldHaveSize 1 + } + } + @Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)]) @Test fun `should create relationship`( diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt index 477a524c2..5031c230c 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt @@ -135,14 +135,16 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH } private fun buildNode(keys: Map>>, named: String): Node { - require(keys.isNotEmpty()) { - "schema strategy requires at least one node key associated with node aliased '$named'." + val validKeys = keys.filterValues { it.isNotEmpty() } + + require(validKeys.isNotEmpty()) { + "schema strategy requires at least one node key with valid properties aliased '$named'." } val node = - Cypher.node(keys.keys.first(), keys.keys.drop(1)) + Cypher.node(validKeys.keys.first(), validKeys.keys.drop(1)) .withProperties( - keys + validKeys .flatMap { it.value } .asSequence() .flatMap { it.asSequence() } diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt index cbf077b08..781221aee 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt @@ -107,7 +107,7 @@ class CdcSchemaHandlerTest { .also { it shouldHaveMessage Regex( - "^schema strategy requires at least one node key associated with node aliased.*$") + "^schema strategy requires at least one node key with valid properties aliased.*$") } } } @@ -727,6 +727,98 @@ class CdcSchemaHandlerTest { } shouldHaveMessage "update operation requires 'after' field in the event object" } + @Test + fun `should generate correct statement for node deletion events containing null keys property values`() { + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.DELETE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("name" to "john"), mapOf("invalid" to null))), + NodeState(emptyList(), mapOf("name" to "john")), + null), + 1, + 0) + verify( + listOf(sinkMessage), + listOf( + listOf( + ChangeQuery( + 1, + 0, + listOf(sinkMessage), + Query( + "MATCH (n:`Person` {name: ${'$'}nName}) DETACH DELETE n", + mapOf("nName" to "john")))))) + } + + @Test + fun `should generate correct statement for node creation events containing null keys property values`() { + val sinkMessage = + newChangeEventMessage( + event = + NodeEvent( + "node-element-id", + EntityOperation.CREATE, + listOf("Person"), + mapOf("Person" to listOf(mapOf("name" to "john"), mapOf("invalid" to null))), + null, + NodeState( + listOf("Person"), + mapOf( + "name" to "john", + ))), + txId = 1, + seq = 0) + verify( + listOf(sinkMessage), + listOf( + listOf( + ChangeQuery( + 1, + 0, + listOf(sinkMessage), + Query( + "MERGE (n:`Person` {name: ${'$'}nName}) SET n = ${'$'}nProps", + mapOf( + "nName" to "john", + "nProps" to + mapOf( + "name" to "john", + ))))))) + } + + @Test + fun `should fail to generate query when no keys properties are provided`() { + // given a sink message which contains a key with no properties + val sinkMessage = + newChangeEventMessage( + NodeEvent( + "node-element-id", + EntityOperation.DELETE, + listOf("Person"), + mapOf("Person" to emptyList()), + NodeState(emptyList(), mapOf("name" to "john")), + null), + 1, + 0) + + // when the key is handled + // then an exception is thrown + shouldThrow { + val handler = + CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) + + handler.handle(listOf(sinkMessage)) + } + .also { + it shouldHaveMessage + Regex( + "^schema strategy requires at least one node key with valid properties aliased.*$") + } + } + private fun verify(messages: Iterable, expected: Iterable>) { val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig()))