Skip to content

Commit 98c7afb

Browse files
committed
fix: relationship with keys without node keys
1 parent 36cf8e6 commit 98c7afb

File tree

4 files changed

+109
-5
lines changed

4 files changed

+109
-5
lines changed

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/SinkAction.kt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,13 @@ data class MergeRelationshipSinkAction(
146146
val setProperties: Map<String, Any?>,
147147
) : SinkAction() {
148148
init {
149-
require(startNode != SinkActionNodeReference.MATCH_ANY) {
150-
"start node matcher must contain at least one key property for relationship merge action."
151-
}
152-
require(endNode != SinkActionNodeReference.MATCH_ANY) {
153-
"end node matcher must contain at least one key property for relationship merge action."
149+
if (matcher is RelationshipMatcher.ByTypeAndProperties && !matcher.hasKeys) {
150+
require(startNode != SinkActionNodeReference.MATCH_ANY) {
151+
"start node matcher must contain at least one key property for relationship merge action."
152+
}
153+
require(endNode != SinkActionNodeReference.MATCH_ANY) {
154+
"end node matcher must contain at least one key property for relationship merge action."
155+
}
154156
}
155157
}
156158
}

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSchemaEventTransformer.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.neo4j.connectors.kafka.sink.strategy.NodeMatcher
2828
import org.neo4j.connectors.kafka.sink.strategy.RelationshipMatcher
2929
import org.neo4j.connectors.kafka.sink.strategy.SinkAction
3030
import org.neo4j.connectors.kafka.sink.strategy.SinkActionNodeReference
31+
import org.neo4j.connectors.kafka.sink.strategy.UpdateRelationshipSinkAction
3132
import org.neo4j.connectors.kafka.sink.strategy.addedLabels
3233
import org.neo4j.connectors.kafka.sink.strategy.mutatedProperties
3334
import org.neo4j.connectors.kafka.sink.strategy.removedLabels
@@ -139,6 +140,28 @@ class CdcSchemaEventTransformer(val topic: String) : CdcEventTransformer {
139140
val (relMatchType, relMatchProperties) =
140141
buildMatchLabelsAndProperties(event.type, relationshipKeys, event.before.properties)
141142

143+
// If there are no keys to match the relationship start and end nodes, then we should not use
144+
// merge on the relationship as it may lead to unintended creation of relationships. Instead,
145+
// we use an Update operation.
146+
if (startMatchProperties.isEmpty() || endMatchProperties.isEmpty()) {
147+
return UpdateRelationshipSinkAction(
148+
SinkActionNodeReference(
149+
NodeMatcher.ByLabelsAndProperties(startMatchLabels, startMatchProperties),
150+
LookupMode.MATCH,
151+
),
152+
SinkActionNodeReference(
153+
NodeMatcher.ByLabelsAndProperties(endMatchLabels, endMatchProperties),
154+
LookupMode.MATCH,
155+
),
156+
RelationshipMatcher.ByTypeAndProperties(
157+
relMatchType,
158+
relMatchProperties,
159+
relationshipKeys.isNotEmpty(),
160+
),
161+
event.mutatedProperties(),
162+
)
163+
}
164+
142165
return MergeRelationshipSinkAction(
143166
SinkActionNodeReference(
144167
NodeMatcher.ByLabelsAndProperties(startMatchLabels, startMatchProperties),

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSchemaEventTransformerTest.kt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.neo4j.connectors.kafka.sink.strategy.MergeRelationshipSinkAction
4343
import org.neo4j.connectors.kafka.sink.strategy.NodeMatcher
4444
import org.neo4j.connectors.kafka.sink.strategy.RelationshipMatcher
4545
import org.neo4j.connectors.kafka.sink.strategy.SinkActionNodeReference
46+
import org.neo4j.connectors.kafka.sink.strategy.UpdateRelationshipSinkAction
4647
import org.neo4j.connectors.kafka.utils.JSONUtils
4748

4849
class CdcSchemaEventTransformerTest {
@@ -225,6 +226,35 @@ class CdcSchemaEventTransformerTest {
225226
)
226227
}
227228

229+
@Test
230+
fun `should transform relationship update event with keys but without node keys to an update action`() {
231+
val event =
232+
RelationshipEvent(
233+
"rel-id",
234+
"KNOWS",
235+
Node("s-id", listOf("Person"), mapOf("Person" to emptyList<Map<String, Any>>())),
236+
Node("e-id", emptyList<String>(), emptyMap<String, List<Map<String, Any>>>()),
237+
listOf(mapOf("relId" to "R1")),
238+
EntityOperation.UPDATE,
239+
RelationshipState(mapOf("since" to 2020)),
240+
RelationshipState(mapOf("since" to 2021, "rating" to 5)),
241+
)
242+
243+
transformer.transform(changeEvent(event)) shouldBe
244+
UpdateRelationshipSinkAction(
245+
SinkActionNodeReference(
246+
NodeMatcher.ByLabelsAndProperties(emptySet(), emptyMap()),
247+
LookupMode.MATCH,
248+
),
249+
SinkActionNodeReference(
250+
NodeMatcher.ByLabelsAndProperties(emptySet(), emptyMap()),
251+
LookupMode.MATCH,
252+
),
253+
RelationshipMatcher.ByTypeAndProperties("KNOWS", mapOf("relId" to "R1"), true),
254+
mapOf("since" to 2021, "rating" to 5),
255+
)
256+
}
257+
228258
@Test
229259
fun `should transform relationship deletion event`() {
230260
val event =

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSchemaHandlerIT.kt

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,55 @@ abstract class CdcSchemaHandlerIT(
921921
verifyEosOffsetIfEnabled(session, CDC_SCHEMA, eosOffsetLabel, 0)
922922
}
923923

924+
@Test
925+
fun `should update relationship with key without any node key properties`() {
926+
session.createNodeKeyConstraint(neo4j(), "user_key", "User", "userId")
927+
session.createNodeKeyConstraint(neo4j(), "product_key", "Product", "productId")
928+
session.createRelationshipKeyConstraint(neo4j(), "purchased_key", "PURCHASED", "orderId")
929+
session
930+
.run(
931+
"CREATE (:User {userId: 'user1'})-[:PURCHASED {orderId: 'order-123', amount: 50.00}]->(:Product {productId: 'product1'})"
932+
)
933+
.consume()
934+
935+
task.put(
936+
listOf(
937+
newChangeEventMessage(
938+
RelationshipEvent(
939+
"rel-1",
940+
"PURCHASED",
941+
Node("node-1", listOf("User"), emptyMap()), // Missing userId key
942+
Node("node-2", listOf("Product"), emptyMap()), // Missing productId key
943+
listOf(mapOf("orderId" to "order-123")),
944+
EntityOperation.UPDATE,
945+
RelationshipState(mapOf("orderId" to "order-123", "amount" to 50.00)),
946+
RelationshipState(
947+
mapOf(
948+
"orderId" to "order-123",
949+
"amount" to 75.00,
950+
"status" to "updated",
951+
)
952+
),
953+
),
954+
1,
955+
0,
956+
0,
957+
)
958+
.record
959+
)
960+
)
961+
962+
session
963+
.run(
964+
"MATCH (:User {userId: 'user1'})-[r:PURCHASED]->(:Product {productId: 'product1'}) RETURN r{.*}"
965+
)
966+
.single()
967+
.get(0)
968+
.asMap() shouldBe mapOf("orderId" to "order-123", "amount" to 75.00, "status" to "updated")
969+
970+
verifyEosOffsetIfEnabled(session, CDC_SCHEMA, eosOffsetLabel, 0)
971+
}
972+
924973
@Test
925974
fun `should delete relationship with key`() {
926975
session.createNodeKeyConstraint(neo4j(), "user_key", "User", "userId")

0 commit comments

Comments
 (0)