Skip to content

Commit 855f7eb

Browse files
authored
fix: filter invalid keys from node change events (#187)
1 parent 327c05c commit 855f7eb

File tree

4 files changed

+392
-6
lines changed

4 files changed

+392
-6
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/data/StreamsTransactionEventExtensions.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,13 @@ object StreamsTransactionEventExtensions {
7070
this.schema.constraints
7171
.filter { c -> c.type == StreamsConstraintType.UNIQUE }
7272
.map { c ->
73-
c.label!! to c.properties.associateWith { referenceState.properties[it] }
73+
c.label!! to
74+
c.properties
75+
.associateWith { referenceState.properties[it] }
76+
// Does not have values associated in the schema
77+
.filterValues { it != null }
7478
}
79+
.filter { it.second.isNotEmpty() }
7580
.groupBy { it.first }
7681
.mapValues { it.value.map { p -> p.second } }
7782

sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.neo4j.connectors.kafka.sink
1818

19+
import io.kotest.assertions.nondeterministic.continually
1920
import io.kotest.assertions.nondeterministic.eventually
2021
import io.kotest.matchers.collections.shouldHaveSize
2122
import io.kotest.matchers.should
@@ -24,6 +25,7 @@ import java.time.LocalDate
2425
import kotlin.time.Duration.Companion.seconds
2526
import org.junit.jupiter.api.Test
2627
import org.neo4j.connectors.kafka.events.Constraint
28+
import org.neo4j.connectors.kafka.events.EntityType
2729
import org.neo4j.connectors.kafka.events.Meta
2830
import org.neo4j.connectors.kafka.events.NodeChange
2931
import org.neo4j.connectors.kafka.events.NodePayload
@@ -158,6 +160,291 @@ class Neo4jCdcSchemaFromStreamsMessageIT {
158160
}
159161
}
160162

163+
@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
164+
@Test
165+
fun `should create a node with a null unique constraint property value`(
166+
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
167+
session: Session
168+
) = runTest {
169+
170+
// given a creation event
171+
// with a unique constraint referencing a property that doesn't exist
172+
val event =
173+
StreamsTransactionEvent(
174+
meta = newMetadata(operation = OperationType.created),
175+
payload =
176+
NodePayload(
177+
id = "1",
178+
type = EntityType.node,
179+
before = null,
180+
after =
181+
NodeChange(
182+
mapOf(
183+
"first_name" to "john",
184+
"last_name" to "smith",
185+
"email" to "john@smith.org",
186+
),
187+
listOf("Person")),
188+
),
189+
schema =
190+
Schema(
191+
properties =
192+
mapOf(
193+
"first_name" to "String",
194+
"last_name" to "String",
195+
"email" to "String",
196+
),
197+
constraints =
198+
listOf(
199+
Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE),
200+
Constraint(
201+
"Person",
202+
setOf("email"),
203+
StreamsConstraintType.NODE_PROPERTY_EXISTS),
204+
Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)),
205+
))
206+
207+
// when the event is published
208+
producer.publish(event)
209+
210+
// then a new node should exist
211+
eventually(30.seconds) {
212+
val result =
213+
session
214+
.run(
215+
"MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n",
216+
mapOf("first_name" to "john"))
217+
.list()
218+
219+
result shouldHaveSize 1
220+
}
221+
}
222+
223+
@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
224+
@Test
225+
fun `should update node with a null unique constraint property value`(
226+
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
227+
session: Session
228+
) = runTest {
229+
230+
// given a database with a single node
231+
session
232+
.run(
233+
"CREATE (n:Person) SET n = ${'$'}props",
234+
mapOf(
235+
"props" to
236+
mapOf(
237+
"first_name" to "john",
238+
"last_name" to "smith",
239+
"email" to "john@smith.org",
240+
)))
241+
.consume()
242+
243+
// and an update event adding a new property and label
244+
// which contains a non-existent constraint
245+
val updateEvent =
246+
StreamsTransactionEvent(
247+
meta = newMetadata(operation = OperationType.updated),
248+
payload =
249+
NodePayload(
250+
id = "Person",
251+
before =
252+
NodeChange(
253+
mapOf(
254+
"first_name" to "john",
255+
"last_name" to "smith",
256+
"email" to "john@smith.org",
257+
),
258+
listOf("Person")),
259+
after =
260+
NodeChange(
261+
properties =
262+
mapOf(
263+
"first_name" to "john",
264+
"last_name" to "smith",
265+
"email" to "john@smith.org",
266+
"location" to "London"),
267+
labels = listOf("Person", "Employee"))),
268+
schema =
269+
Schema(
270+
constraints =
271+
listOf(
272+
Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE),
273+
Constraint(
274+
"Person",
275+
setOf("email"),
276+
StreamsConstraintType.NODE_PROPERTY_EXISTS),
277+
Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)),
278+
))
279+
280+
// when the message is published
281+
producer.publish(updateEvent)
282+
283+
// then the node should exist with its additional properties and labels
284+
eventually(30.seconds) {
285+
val result =
286+
session
287+
.run(
288+
"MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n",
289+
mapOf("first_name" to "john"))
290+
.single()
291+
292+
result.get("n").asNode() should
293+
{
294+
it.labels() shouldBe listOf("Person", "Employee")
295+
it.asMap() shouldBe
296+
mapOf(
297+
"first_name" to "john",
298+
"last_name" to "smith",
299+
"email" to "john@smith.org",
300+
"location" to "London")
301+
}
302+
}
303+
}
304+
305+
@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
306+
@Test
307+
fun `should delete a node with a null unique constraint property value`(
308+
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
309+
session: Session
310+
) = runTest {
311+
312+
// given a database containing 1 node
313+
session
314+
.run(
315+
"CREATE (n:Person) SET n = ${'$'}props",
316+
mapOf(
317+
"props" to
318+
mapOf(
319+
"first_name" to "john",
320+
"last_name" to "smith",
321+
"email" to "john@smith.org",
322+
)))
323+
.consume()
324+
325+
// and a deletion event and with a unique constraint referencing a property that doesn't exist
326+
val event =
327+
StreamsTransactionEvent(
328+
meta = newMetadata(operation = OperationType.deleted),
329+
payload =
330+
NodePayload(
331+
id = "1",
332+
type = EntityType.node,
333+
after = null,
334+
before =
335+
NodeChange(
336+
mapOf(
337+
"first_name" to "john",
338+
"last_name" to "smith",
339+
"email" to "john@smith.org",
340+
),
341+
listOf("Person")),
342+
),
343+
schema =
344+
Schema(
345+
properties =
346+
mapOf(
347+
"first_name" to "String",
348+
"last_name" to "String",
349+
"email" to "String",
350+
),
351+
constraints =
352+
listOf(
353+
Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE),
354+
Constraint(
355+
"Person",
356+
setOf("email"),
357+
StreamsConstraintType.NODE_PROPERTY_EXISTS),
358+
Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)),
359+
))
360+
361+
// when the event is published
362+
producer.publish(event)
363+
364+
// then the node should no longer exist
365+
eventually(10.seconds) {
366+
val result =
367+
session
368+
.run(
369+
"MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n",
370+
mapOf("first_name" to "john"))
371+
.list()
372+
373+
result shouldHaveSize 0
374+
}
375+
}
376+
377+
@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
378+
@Test
379+
fun `should fail to delete a node when no valid unique constraints are provided`(
380+
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
381+
session: Session
382+
) = runTest {
383+
384+
// given a database containing 1 node
385+
session
386+
.run(
387+
"CREATE (n:Person) SET n = ${'$'}props",
388+
mapOf(
389+
"props" to
390+
mapOf(
391+
"first_name" to "john",
392+
"last_name" to "smith",
393+
"email" to "john@smith.org",
394+
)))
395+
.consume()
396+
397+
// and a deletion event and with a multiple unique constraints which do not have a valid
398+
// property
399+
val event =
400+
StreamsTransactionEvent(
401+
meta = newMetadata(operation = OperationType.deleted),
402+
payload =
403+
NodePayload(
404+
id = "1",
405+
type = EntityType.node,
406+
after = null,
407+
before =
408+
NodeChange(
409+
mapOf(
410+
"first_name" to "john",
411+
"last_name" to "smith",
412+
),
413+
listOf("Person")),
414+
),
415+
schema =
416+
Schema(
417+
properties =
418+
mapOf(
419+
"first_name" to "String",
420+
"last_name" to "String",
421+
),
422+
constraints =
423+
listOf(
424+
Constraint("Person", setOf("email"), StreamsConstraintType.UNIQUE),
425+
Constraint(
426+
"Person",
427+
setOf("email"),
428+
StreamsConstraintType.NODE_PROPERTY_EXISTS),
429+
Constraint("Person", setOf("invalid"), StreamsConstraintType.UNIQUE)),
430+
))
431+
432+
// when the event is published
433+
producer.publish(event)
434+
435+
// then the node should not be deleted and should still exist
436+
continually(10.seconds) {
437+
val result =
438+
session
439+
.run(
440+
"MATCH (n:Person {first_name: ${'$'}first_name}) RETURN n",
441+
mapOf("first_name" to "john"))
442+
.list()
443+
444+
result shouldHaveSize 1
445+
}
446+
}
447+
161448
@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])
162449
@Test
163450
fun `should create relationship`(

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,16 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
135135
}
136136

137137
private fun buildNode(keys: Map<String, List<Map<String, Any>>>, named: String): Node {
138-
require(keys.isNotEmpty()) {
139-
"schema strategy requires at least one node key associated with node aliased '$named'."
138+
val validKeys = keys.filterValues { it.isNotEmpty() }
139+
140+
require(validKeys.isNotEmpty()) {
141+
"schema strategy requires at least one node key with valid properties aliased '$named'."
140142
}
141143

142144
val node =
143-
Cypher.node(keys.keys.first(), keys.keys.drop(1))
145+
Cypher.node(validKeys.keys.first(), validKeys.keys.drop(1))
144146
.withProperties(
145-
keys
147+
validKeys
146148
.flatMap { it.value }
147149
.asSequence()
148150
.flatMap { it.asSequence() }

0 commit comments

Comments
 (0)