Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,13 @@ object StreamsTransactionEventExtensions {
this.schema.constraints
.filter { c -> c.type == StreamsConstraintType.UNIQUE }
.map { c ->
c.label!! to c.properties.associateWith { referenceState.properties[it] }
c.label!! to
c.properties
.associateWith { referenceState.properties[it] }
// Does not have values associated in the schema
.filterValues { it != null }
}
.filter { it.second.isNotEmpty() }
.groupBy { it.first }
.mapValues { it.value.map { p -> p.second } }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.neo4j.connectors.kafka.sink

import io.kotest.assertions.nondeterministic.continually
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.should
Expand All @@ -24,6 +25,7 @@ import java.time.LocalDate
import kotlin.time.Duration.Companion.seconds
import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.events.Constraint
import org.neo4j.connectors.kafka.events.EntityType
import org.neo4j.connectors.kafka.events.Meta
import org.neo4j.connectors.kafka.events.NodeChange
import org.neo4j.connectors.kafka.events.NodePayload
Expand Down Expand Up @@ -158,6 +160,291 @@ class Neo4jCdcSchemaFromStreamsMessageIT {
}
}

@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
@Test
fun `should create a node with a null unique constraint property value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session
) = runTest {

// given a creation event
// with a unique constraint referencing a property that doesn't exist
val event =
StreamsTransactionEvent(
meta = newMetadata(operation = OperationType.created),
payload =
NodePayload(
id = "1",
type = EntityType.node,
before = null,
after =
NodeChange(
mapOf(
"first_name" to "john",
"last_name" to "smith",
"email" to "john@smith.org",
),
listOf("Person")),
),
schema =
Schema(
properties =
mapOf(
"first_name" to "String",
"last_name" to "String",
"email" to "String",
),
constraints =
listOf(
Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE),
Constraint(
"Person",
setOf("email"),
StreamsConstraintType.NODE_PROPERTY_EXISTS),
Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)),
))

// when the event is published
producer.publish(event)

// then a new node should exist
eventually(30.seconds) {
val result =
session
.run(
"MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n",
mapOf("first_name" to "john"))
.list()

result shouldHaveSize 1
}
}

@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
@Test
fun `should update node with a null unique constraint property value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session
) = runTest {

// given a database with a single node
session
.run(
"CREATE (n:Person) SET n = ${'$'}props",
mapOf(
"props" to
mapOf(
"first_name" to "john",
"last_name" to "smith",
"email" to "john@smith.org",
)))
.consume()

// and an update event adding a new property and label
// which contains a non-existent constraint
val updateEvent =
StreamsTransactionEvent(
meta = newMetadata(operation = OperationType.updated),
payload =
NodePayload(
id = "Person",
before =
NodeChange(
mapOf(
"first_name" to "john",
"last_name" to "smith",
"email" to "john@smith.org",
),
listOf("Person")),
after =
NodeChange(
properties =
mapOf(
"first_name" to "john",
"last_name" to "smith",
"email" to "john@smith.org",
"location" to "London"),
labels = listOf("Person", "Employee"))),
schema =
Schema(
constraints =
listOf(
Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE),
Constraint(
"Person",
setOf("email"),
StreamsConstraintType.NODE_PROPERTY_EXISTS),
Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)),
))

// when the message is published
producer.publish(updateEvent)

// then the node should exist with its additional properties and labels
eventually(30.seconds) {
val result =
session
.run(
"MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n",
mapOf("first_name" to "john"))
.single()

result.get("n").asNode() should
{
it.labels() shouldBe listOf("Person", "Employee")
it.asMap() shouldBe
mapOf(
"first_name" to "john",
"last_name" to "smith",
"email" to "john@smith.org",
"location" to "London")
}
}
}

@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
@Test
fun `should delete a node with a null unique constraint property value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session
) = runTest {

// given a database containing 1 node
session
.run(
"CREATE (n:Person) SET n = ${'$'}props",
mapOf(
"props" to
mapOf(
"first_name" to "john",
"last_name" to "smith",
"email" to "john@smith.org",
)))
.consume()

// and a deletion event and with a unique constraint referencing a property that doesn't exist
val event =
StreamsTransactionEvent(
meta = newMetadata(operation = OperationType.deleted),
payload =
NodePayload(
id = "1",
type = EntityType.node,
after = null,
before =
NodeChange(
mapOf(
"first_name" to "john",
"last_name" to "smith",
"email" to "john@smith.org",
),
listOf("Person")),
),
schema =
Schema(
properties =
mapOf(
"first_name" to "String",
"last_name" to "String",
"email" to "String",
),
constraints =
listOf(
Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE),
Constraint(
"Person",
setOf("email"),
StreamsConstraintType.NODE_PROPERTY_EXISTS),
Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)),
))

// when the event is published
producer.publish(event)

// then the node should no longer exist
eventually(10.seconds) {
val result =
session
.run(
"MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n",
mapOf("first_name" to "john"))
.list()

result shouldHaveSize 0
}
}

@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
@Test
fun `should fail to delete a node when no valid unique constraints are provided`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session
) = runTest {

// given a database containing 1 node
session
.run(
"CREATE (n:Person) SET n = ${'$'}props",
mapOf(
"props" to
mapOf(
"first_name" to "john",
"last_name" to "smith",
"email" to "john@smith.org",
)))
.consume()

// and a deletion event and with a multiple unique constraints which do not have a valid
// property
val event =
StreamsTransactionEvent(
meta = newMetadata(operation = OperationType.deleted),
payload =
NodePayload(
id = "1",
type = EntityType.node,
after = null,
before =
NodeChange(
mapOf(
"first_name" to "john",
"last_name" to "smith",
),
listOf("Person")),
),
schema =
Schema(
properties =
mapOf(
"first_name" to "String",
"last_name" to "String",
),
constraints =
listOf(
Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE),
Constraint(
"Person",
setOf("email"),
StreamsConstraintType.NODE_PROPERTY_EXISTS),
Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)),
))

// when the event is published
producer.publish(event)

// then the node should not be deleted and should still exist
continually(10.seconds) {
val result =
session
.run(
"MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n",
mapOf("first_name" to "john"))
.list()

result shouldHaveSize 1
}
}

@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
@Test
fun `should create relationship`(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,16 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
}

private fun buildNode(keys: Map<String, List<Map<String, Any>>>, named: String): Node {
require(keys.isNotEmpty()) {
"schema strategy requires at least one node key associated with node aliased '$named'."
val validKeys = keys.filterValues { it.isNotEmpty() }

require(validKeys.isNotEmpty()) {
"schema strategy requires at least one node key with valid properties aliased '$named'."
}

val node =
Cypher.node(keys.keys.first(), keys.keys.drop(1))
Cypher.node(validKeys.keys.first(), validKeys.keys.drop(1))
.withProperties(
keys
validKeys
.flatMap { it.value }
.asSequence()
.flatMap { it.asSequence() }
Expand Down
Loading