Skip to content

Commit fb90ba9

Browse files
authored
fix: try to be idempotent about relationship creations (#499)
1 parent ea5a69a commit fb90ba9

File tree

7 files changed

+14
-16
lines changed

7 files changed

+14
-16
lines changed

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/ApocCdcSchemaHandler.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class ApocCdcSchemaHandler(val topic: String, neo4j: Neo4j, batchSize: Int) :
114114
val (startMatchLabels, startMatchProperties) = buildMatchLabelsAndProperties(event.start.keys)
115115
val (endMatchLabels, endMatchProperties) = buildMatchLabelsAndProperties(event.end.keys)
116116
val (relMatchType, relMatchProperties) =
117-
buildMatchLabelsAndProperties(event.type, event.keys, null)
117+
buildMatchLabelsAndProperties(event.type, event.keys, event.after.properties)
118118

119119
return CdcRelationshipData(
120120
EntityOperation.CREATE,

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/CdcData.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ data class CdcNodeData(
7070
"MERGE (n$matchLabels$matchProps) SET n += ${'$'}$EVENT.setProperties SET n:\$(${'$'}$EVENT.addLabels) REMOVE n:\$(${'$'}$EVENT.removeLabels)"
7171
}
7272
EntityOperation.DELETE -> {
73-
"MATCH (n$matchLabels$matchProps) DETACH DELETE n"
73+
"MATCH (n$matchLabels$matchProps) DELETE n"
7474
}
7575
}
7676
}
@@ -133,11 +133,7 @@ data class CdcRelationshipData(
133133

134134
return when (operation) {
135135
EntityOperation.CREATE -> {
136-
if (!hasKeys) {
137-
"MATCH (start$startMatchLabels$startMatchProps) MATCH (end$endMatchLabels$endMatchProps) CREATE (start)-[r:$matchType$matchProps]->(end) SET r += ${'$'}$EVENT.setProperties"
138-
} else {
139-
"MATCH (start$startMatchLabels$startMatchProps) MATCH (end$endMatchLabels$endMatchProps) MERGE (start)-[r:$matchType$matchProps]->(end) SET r += ${'$'}$EVENT.setProperties"
140-
}
136+
"MATCH (start$startMatchLabels$startMatchProps) MATCH (end$endMatchLabels$endMatchProps) MERGE (start)-[r:$matchType$matchProps]->(end) SET r += ${'$'}$EVENT.setProperties"
141137
}
142138
EntityOperation.UPDATE -> {
143139
if (!hasKeys) {

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/batch/CdcData.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ data class CdcNodeData(
6666
"MERGE (n:\$($EVENT.matchLabels) {$matchProps}) SET n += $EVENT.setProperties SET n:\$($EVENT.addLabels) REMOVE n:\$($EVENT.removeLabels)"
6767
}
6868
EntityOperation.DELETE -> {
69-
"MATCH (n:\$($EVENT.matchLabels) {$matchProps}) DETACH DELETE n"
69+
"MATCH (n:\$($EVENT.matchLabels) {$matchProps}) DELETE n"
7070
}
7171
}
7272
}

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/ApocCdcSchemaHandlerTest.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ class ApocCdcSchemaHandlerTest {
478478
listOf(
479479
mapOf(
480480
"stmt" to
481-
"MATCH (n:Person {name: \$e.matchProperties.name, surname: \$e.matchProperties.surname}) DETACH DELETE n",
481+
"MATCH (n:Person {name: \$e.matchProperties.name, surname: \$e.matchProperties.surname}) DELETE n",
482482
"params" to
483483
mapOf(
484484
"e" to
@@ -543,7 +543,7 @@ class ApocCdcSchemaHandlerTest {
543543
listOf(
544544
mapOf(
545545
"stmt" to
546-
"MATCH (start:Person {id: \$e.start.matchProperties.id}) MATCH (end:Person {id: \$e.end.matchProperties.id}) CREATE (start)-[r:KNOWS]->(end) SET r += \$e.setProperties",
546+
"MATCH (start:Person {id: \$e.start.matchProperties.id}) MATCH (end:Person {id: \$e.end.matchProperties.id}) MERGE (start)-[r:KNOWS {since: \$e.matchProperties.since}]->(end) SET r += \$e.setProperties",
547547
"params" to
548548
mapOf(
549549
"e" to
@@ -559,7 +559,9 @@ class ApocCdcSchemaHandlerTest {
559559
mapOf("id" to 2L)
560560
),
561561
"matchProperties" to
562-
emptyMap<String, Any>(),
562+
mapOf(
563+
"since" to LocalDate.of(2000, 1, 1)
564+
),
563565
"setProperties" to
564566
mapOf(
565567
"since" to LocalDate.of(2000, 1, 1)
@@ -1191,7 +1193,7 @@ class ApocCdcSchemaHandlerTest {
11911193
listOf(
11921194
mapOf(
11931195
"stmt" to
1194-
"MATCH (n:Person {name: \$e.matchProperties.name}) DETACH DELETE n",
1196+
"MATCH (n:Person {name: \$e.matchProperties.name}) DELETE n",
11951197
"params" to
11961198
mapOf(
11971199
"e" to

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/ApocCdcSourceIdHandlerTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ class ApocCdcSourceIdHandlerTest {
434434
listOf(
435435
mapOf(
436436
"stmt" to
437-
"MATCH (n:SourceEvent {sourceElementId: \$e.matchProperties.sourceElementId}) DETACH DELETE n",
437+
"MATCH (n:SourceEvent {sourceElementId: \$e.matchProperties.sourceElementId}) DELETE n",
438438
"params" to
439439
mapOf(
440440
"e" to

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/batch/BatchedCdcSchemaHandlerTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ class BatchedCdcSchemaHandlerTest {
453453
Query(
454454
"CYPHER 25 UNWIND \$events AS e CALL (e) { WHEN e.q = \$q0 THEN " +
455455
"MATCH (n:\$(e.matchLabels) {name: e.matchProperties.name, surname: e.matchProperties.surname}) " +
456-
"DETACH DELETE n } FINISH",
456+
"DELETE n } FINISH",
457457
mapOf(
458458
"q0" to 0,
459459
"events" to
@@ -1139,7 +1139,7 @@ class BatchedCdcSchemaHandlerTest {
11391139
null,
11401140
listOf(sinkMessage),
11411141
Query(
1142-
"CYPHER 25 UNWIND \$events AS e CALL (e) { WHEN e.q = \$q0 THEN MATCH (n:\$(e.matchLabels) {name: e.matchProperties.name}) DETACH DELETE n } FINISH",
1142+
"CYPHER 25 UNWIND \$events AS e CALL (e) { WHEN e.q = \$q0 THEN MATCH (n:\$(e.matchLabels) {name: e.matchProperties.name}) DELETE n } FINISH",
11431143
mapOf(
11441144
"q0" to 0,
11451145
"events" to

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/batch/BatchedCdcSourceIdHandlerTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ class BatchedCdcSourceIdHandlerTest {
400400
Query(
401401
"CYPHER 25 UNWIND \$events AS e CALL (e) { WHEN e.q = \$q0 THEN " +
402402
"MATCH (n:\$(e.matchLabels) {sourceElementId: e.matchProperties.sourceElementId}) " +
403-
"DETACH DELETE n } FINISH",
403+
"DELETE n } FINISH",
404404
mapOf(
405405
"q0" to 0,
406406
"events" to

0 commit comments

Comments
 (0)