Skip to content

Commit bf2e1c4

Browse files
authored
feat: make CDC message key serialization configurable (#52)
Allow users to select a key serialization strategy.
1 parent 0bd49c0 commit bf2e1c4

File tree

14 files changed

+774
-31
lines changed

14 files changed

+774
-31
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,17 @@ object ChangeEventExtensions {
125125

126126
private fun NodeEvent.toConnectValue(schema: Schema): Struct =
127127
Struct(schema).also {
128+
val keys =
129+
if (this.keys.isEmpty()) {
130+
null
131+
} else {
132+
DynamicTypes.valueFor(schema.field("keys").schema(), this.keys)
133+
}
128134
it.put("elementId", this.elementId)
129135
it.put("eventType", this.eventType.name)
130136
it.put("operation", this.operation.name)
131137
it.put("labels", this.labels)
132-
it.put("keys", DynamicTypes.valueFor(schema.field("keys").schema(), this.keys))
138+
it.put("keys", keys)
133139
it.put("state", nodeStateValue(schema.field("state").schema(), this.before, this.after))
134140
}
135141

@@ -208,13 +214,19 @@ object ChangeEventExtensions {
208214

209215
private fun RelationshipEvent.toConnectValue(schema: Schema): Struct =
210216
Struct(schema).also {
217+
val keys =
218+
if (this.keys.isEmpty()) {
219+
null
220+
} else {
221+
DynamicTypes.valueFor(schema.field("keys").schema(), this.keys)
222+
}
211223
it.put("elementId", this.elementId)
212224
it.put("eventType", this.eventType.name)
213225
it.put("operation", this.operation.name)
214226
it.put("type", this.type)
215227
it.put("start", this.start.toConnectValue(schema.field("start").schema()))
216228
it.put("end", this.end.toConnectValue(schema.field("end").schema()))
217-
it.put("keys", DynamicTypes.valueFor(schema.field("keys").schema(), this.keys))
229+
it.put("keys", keys)
218230
it.put(
219231
"state",
220232
relationshipStateValue(schema.field("state").schema(), this.before, this.after))

common/src/test/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensionsTest.kt

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,43 @@ class ChangeEventExtensionsTest {
929929
.put("since", "2000-01-01"))))
930930
}
931931

932+
@Test
933+
fun `node event keys should be nullified when node keys are not defined`() {
934+
val (_, _, schema, value) =
935+
newChangeEvent(
936+
NodeEvent(
937+
"element-0",
938+
EntityOperation.CREATE,
939+
listOf("Label1", "Label2"),
940+
mapOf(),
941+
null,
942+
NodeState(listOf("Label1"), mapOf("id" to 5L))))
943+
944+
val expectedKeySchema = SchemaBuilder.struct().optional().build()
945+
schema.nestedSchema("event.keys") shouldBe expectedKeySchema
946+
value.nestedValue("event.keys") shouldBe null
947+
}
948+
949+
@Test
950+
fun `relationship event keys should be nullified when rel keys are not defined`() {
951+
val (_, _, schema, value) =
952+
newChangeEvent(
953+
RelationshipEvent(
954+
"rel-0",
955+
"WORKS_FOR",
956+
Node("node-0", listOf("Person"), mapOf()),
957+
Node("node-1", listOf("Company"), mapOf()),
958+
listOf(),
959+
EntityOperation.DELETE,
960+
RelationshipState(mapOf("id" to 5L)),
961+
null))
962+
963+
val expectedKeySchema =
964+
SchemaBuilder.array(SchemaBuilder.struct().optional().build()).optional().build()
965+
schema.nestedSchema("event.keys") shouldBe expectedKeySchema
966+
value.nestedValue("event.keys") shouldBe null
967+
}
968+
932969
data class ChangeEventResult<T : Event>(
933970
val event: T,
934971
val change: ChangeEvent,
@@ -964,12 +1001,20 @@ class ChangeEventExtensionsTest {
9641001
private fun Schema.nestedSchema(path: String): Schema {
9651002
require(path.isNotBlank())
9661003

967-
var current = this
968-
path.split('.').forEach { current = current.field(it).schema() }
969-
return current
1004+
return path.split('.').fold(this) { schema, field -> schema.field(field).schema() }
9701005
}
9711006

9721007
private fun Schema.nestedValueSchema(path: String): Schema {
9731008
return nestedSchema(path).valueSchema()
9741009
}
1010+
1011+
private fun Struct.nestedValue(path: String): Any? {
1012+
require(path.isNotBlank())
1013+
1014+
val fields = path.split('.')
1015+
return fields
1016+
.dropLast(1)
1017+
.fold(this) { struct, field -> struct[field] as Struct }
1018+
.get(fields.last())
1019+
}
9751020
}

legacy-connectors/src/test/kotlin/streams/kafka/connect/source/LegacyNeo4jSourceIT.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class LegacyNeo4jSourceIT {
6565
mapOf("execId" to executionId))
6666
.consume()
6767

68+
@Suppress("DEPRECATION")
6869
TopicVerifier.create(consumer)
6970
.expectMessageValueMatching { value ->
7071
value.asMap().excludingKeys("timestamp") ==

0 commit comments

Comments
 (0)