Skip to content

Commit 750f829

Browse files
authored
feat: support new CDC keys structure (#46)
1 parent 3f8dbe4 commit 750f829

File tree

3 files changed

+236
-97
lines changed

3 files changed

+236
-97
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ object ChangeEventExtensions {
119119
.field("eventType", SimpleTypes.STRING.schema())
120120
.field("operation", SimpleTypes.STRING.schema())
121121
.field("labels", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
122-
.field("keys", schemaForKeys(this.keys))
122+
.field("keys", schemaForKeysByLabel(this.keys))
123123
.field(
124124
"state",
125125
SchemaBuilder.struct()
@@ -154,7 +154,7 @@ object ChangeEventExtensions {
154154
.field("type", SimpleTypes.STRING.schema())
155155
.field("start", this.start.toConnectSchema())
156156
.field("end", this.end.toConnectSchema())
157-
.field("key", schemaForKey(this.key))
157+
.field("keys", schemaForKeys(this.keys))
158158
.field(
159159
"state",
160160
SchemaBuilder.struct()
@@ -172,7 +172,7 @@ object ChangeEventExtensions {
172172
it.put("type", this.type)
173173
it.put("start", this.start.toConnectValue(schema.field("start").schema()))
174174
it.put("end", this.end.toConnectValue(schema.field("end").schema()))
175-
it.put("key", DynamicTypes.valueFor(schema.field("key").schema(), this.key))
175+
it.put("keys", DynamicTypes.valueFor(schema.field("keys").schema(), this.keys))
176176
it.put(
177177
"state",
178178
schema.field("state").schema().let { stateSchema ->
@@ -225,7 +225,7 @@ object ChangeEventExtensions {
225225
.namespaced("cdc.Node")
226226
.field("elementId", SimpleTypes.STRING.schema())
227227
.field("labels", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
228-
.field("keys", DynamicTypes.schemaFor(this.keys, true))
228+
.field("keys", schemaForKeysByLabel(this.keys))
229229
.build()
230230
}
231231

@@ -271,16 +271,25 @@ object ChangeEventExtensions {
271271
}
272272
}
273273

274-
private fun schemaForKeys(keys: Map<String, Map<String, Any>>): Schema {
274+
private fun schemaForKeysByLabel(keys: Map<String, List<Map<String, Any>>>): Schema {
275275
return SchemaBuilder.struct()
276-
.apply { keys.forEach { field(it.key, schemaForKey(it.value)) } }
276+
.apply { keys.forEach { field(it.key, schemaForKeys(it.value)) } }
277277
.optional()
278278
.build()
279279
}
280280

281-
private fun schemaForKey(key: Map<String, Any>): Schema {
282-
return SchemaBuilder.struct()
283-
.apply { key.forEach { field(it.key, DynamicTypes.schemaFor(it.value, true)) } }
281+
private fun schemaForKeys(keys: List<Map<String, Any>>): Schema {
282+
return SchemaBuilder.array(
283+
// We need to define a uniform structure of key array elements. Because all elements
284+
// must have identical structure, we list all available keys as optional fields.
285+
SchemaBuilder.struct()
286+
.apply {
287+
keys.forEach { key ->
288+
key.forEach { field(it.key, DynamicTypes.schemaFor(it.value, true)) }
289+
}
290+
}
291+
.optional()
292+
.build())
284293
.optional()
285294
.build()
286295
}

0 commit comments

Comments
 (0)