Skip to content

Commit 39431db

Browse files
authored
feat: add support for constructing change events (#60)
1 parent b9870d9 commit 39431db

File tree

7 files changed

+948
-201
lines changed

7 files changed

+948
-201
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt

Lines changed: 140 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,18 @@ import org.apache.kafka.connect.data.SchemaAndValue
2222
import org.apache.kafka.connect.data.SchemaBuilder
2323
import org.apache.kafka.connect.data.Struct
2424
import org.neo4j.cdc.client.model.ChangeEvent
25+
import org.neo4j.cdc.client.model.ChangeIdentifier
26+
import org.neo4j.cdc.client.model.EntityOperation
2527
import org.neo4j.cdc.client.model.Event
28+
import org.neo4j.cdc.client.model.EventType
2629
import org.neo4j.cdc.client.model.Metadata
2730
import org.neo4j.cdc.client.model.Node
2831
import org.neo4j.cdc.client.model.NodeEvent
2932
import org.neo4j.cdc.client.model.NodeState
3033
import org.neo4j.cdc.client.model.RelationshipEvent
3134
import org.neo4j.cdc.client.model.RelationshipState
35+
import org.neo4j.connectors.kafka.data.ChangeEventExtensions.toConnectValue
36+
import org.neo4j.connectors.kafka.data.ChangeEventExtensions.toNode
3237

3338
object ChangeEventExtensions {
3439

@@ -37,6 +42,14 @@ object ChangeEventExtensions {
3742
return SchemaAndValue(schema, this.toConnectValue(schema))
3843
}
3944

45+
fun Struct.toChangeEvent(): ChangeEvent =
46+
ChangeEvent(
47+
ChangeIdentifier(getString("id")),
48+
getInt64("txId"),
49+
getInt64("seq").toInt(),
50+
getStruct("metadata").toMetadata(),
51+
getStruct("event").toEvent())
52+
4053
private fun ChangeEvent.toConnectSchema(): Schema =
4154
SchemaBuilder.struct()
4255
.namespaced("cdc.ChangeEvent")
@@ -56,7 +69,7 @@ object ChangeEventExtensions {
5669
it.put("event", this.event.toConnectValue(schema.field("event").schema()))
5770
}
5871

59-
private fun Metadata.toConnectSchema(): Schema =
72+
internal fun Metadata.toConnectSchema(): Schema =
6073
SchemaBuilder.struct()
6174
.namespaced("cdc.Metadata")
6275
.field("authenticatedUser", SimpleTypes.STRING.schema())
@@ -68,15 +81,19 @@ object ChangeEventExtensions {
6881
.field("captureMode", SimpleTypes.STRING.schema())
6982
.field("txStartTime", SimpleTypes.ZONEDDATETIME.schema())
7083
.field("txCommitTime", SimpleTypes.ZONEDDATETIME.schema())
71-
.field("txMetadata", DynamicTypes.schemaFor(this.txMetadata, true).schema())
84+
.field(
85+
"txMetadata",
86+
DynamicTypes.toConnectSchema(
87+
this.txMetadata, optional = true, forceMapsAsStruct = true)
88+
.schema())
7289
.also {
7390
this.additionalEntries.forEach { entry ->
74-
it.field(entry.key, DynamicTypes.schemaFor(entry.value, true))
91+
it.field(entry.key, DynamicTypes.toConnectSchema(entry.value, true))
7592
}
7693
}
7794
.build()
7895

79-
private fun Metadata.toConnectValue(schema: Schema): Struct =
96+
internal fun Metadata.toConnectValue(schema: Schema): Struct =
8097
Struct(schema).also {
8198
it.put("authenticatedUser", this.authenticatedUser)
8299
it.put("executingUser", this.executingUser)
@@ -89,13 +106,17 @@ object ChangeEventExtensions {
89106
it.put("txCommitTime", DateTimeFormatter.ISO_DATE_TIME.format(this.txCommitTime))
90107
it.put(
91108
"txMetadata",
92-
DynamicTypes.valueFor(schema.field("txMetadata").schema(), this.txMetadata))
109+
DynamicTypes.toConnectValue(schema.field("txMetadata").schema(), this.txMetadata))
93110

94111
this.additionalEntries.forEach { entry ->
95-
it.put(entry.key, DynamicTypes.valueFor(schema.field(entry.key).schema(), entry.value))
112+
it.put(
113+
entry.key, DynamicTypes.toConnectValue(schema.field(entry.key).schema(), entry.value))
96114
}
97115
}
98116

117+
internal fun Struct.toMetadata(): Metadata =
118+
Metadata.fromMap(DynamicTypes.fromConnectValue(schema(), this) as Map<*, *>)
119+
99120
private fun Event.toConnectSchema(): Schema =
100121
when (val event = this) {
101122
is NodeEvent -> event.toConnectSchema()
@@ -112,7 +133,20 @@ object ChangeEventExtensions {
112133
else -> throw IllegalArgumentException("unsupported event type ${event.javaClass.name}")
113134
}
114135

115-
private fun NodeEvent.toConnectSchema(): Schema =
136+
private fun Struct.toEvent(): Event =
137+
when (val eventType = getString("eventType")) {
138+
EventType.NODE.name,
139+
EventType.NODE.shorthand -> {
140+
toNodeEvent()
141+
}
142+
EventType.RELATIONSHIP.name,
143+
EventType.RELATIONSHIP.shorthand -> {
144+
toRelationshipEvent()
145+
}
146+
else -> throw IllegalArgumentException("unsupported event type $eventType")
147+
}
148+
149+
internal fun NodeEvent.toConnectSchema(): Schema =
116150
SchemaBuilder.struct()
117151
.namespaced("cdc.NodeEvent")
118152
.field("elementId", SimpleTypes.STRING.schema())
@@ -123,14 +157,9 @@ object ChangeEventExtensions {
123157
.field("state", nodeStateSchema(before, after))
124158
.build()
125159

126-
private fun NodeEvent.toConnectValue(schema: Schema): Struct =
160+
internal fun NodeEvent.toConnectValue(schema: Schema): Struct =
127161
Struct(schema).also {
128-
val keys =
129-
if (this.keys.isEmpty()) {
130-
null
131-
} else {
132-
DynamicTypes.valueFor(schema.field("keys").schema(), this.keys)
133-
}
162+
val keys = DynamicTypes.toConnectValue(schema.field("keys").schema(), this.keys)
134163
it.put("elementId", this.elementId)
135164
it.put("eventType", this.eventType.name)
136165
it.put("operation", this.operation.name)
@@ -139,6 +168,20 @@ object ChangeEventExtensions {
139168
it.put("state", nodeStateValue(schema.field("state").schema(), this.before, this.after))
140169
}
141170

171+
@Suppress("UNCHECKED_CAST")
172+
internal fun Struct.toNodeEvent(): NodeEvent =
173+
getStruct("state").toNodeState().let { (before, after) ->
174+
NodeEvent(
175+
getString("elementId"),
176+
EntityOperation.valueOf(getString("operation")),
177+
getArray("labels"),
178+
DynamicTypes.fromConnectValue(
179+
schema().field("keys").schema(), get("keys"), skipNullValuesInMaps = true)
180+
as Map<String, List<MutableMap<String, Any>>>?,
181+
before,
182+
after)
183+
}
184+
142185
private fun nodeStateSchema(before: NodeState?, after: NodeState?): Schema {
143186
val stateSchema =
144187
SchemaBuilder.struct()
@@ -155,7 +198,7 @@ object ChangeEventExtensions {
155198
(before?.properties ?: mapOf()) + (after?.properties ?: mapOf())
156199
combinedProperties.toSortedMap().forEach { entry ->
157200
if (it.field(entry.key) == null) {
158-
it.field(entry.key, DynamicTypes.schemaFor(entry.value, true))
201+
it.field(entry.key, DynamicTypes.toConnectSchema(entry.value, true))
159202
}
160203
}
161204
}
@@ -180,7 +223,7 @@ object ChangeEventExtensions {
180223
it.put("labels", before.labels)
181224
it.put(
182225
"properties",
183-
DynamicTypes.valueFor(
226+
DynamicTypes.toConnectValue(
184227
it.schema().field("properties").schema(), before.properties))
185228
})
186229
}
@@ -192,13 +235,33 @@ object ChangeEventExtensions {
192235
it.put("labels", after.labels)
193236
it.put(
194237
"properties",
195-
DynamicTypes.valueFor(
238+
DynamicTypes.toConnectValue(
196239
it.schema().field("properties").schema(), after.properties))
197240
})
198241
}
199242
}
200243

201-
private fun RelationshipEvent.toConnectSchema(): Schema =
244+
@Suppress("UNCHECKED_CAST")
245+
internal fun Struct.toNodeState(): Pair<NodeState?, NodeState?> =
246+
Pair(
247+
getStruct("before")?.let {
248+
val labels = it.getArray<String>("labels")
249+
val properties = it.getStruct("properties")
250+
NodeState(
251+
labels,
252+
DynamicTypes.fromConnectValue(properties.schema(), properties, true)
253+
as Map<String, Any?>)
254+
},
255+
getStruct("after")?.let {
256+
val labels = it.getArray<String>("labels")
257+
val properties = it.getStruct("properties")
258+
NodeState(
259+
labels,
260+
DynamicTypes.fromConnectValue(properties.schema(), properties, true)
261+
as Map<String, Any?>)
262+
})
263+
264+
internal fun RelationshipEvent.toConnectSchema(): Schema =
202265
SchemaBuilder.struct()
203266
.namespaced("cdc.RelationshipEvent")
204267
.field("elementId", SimpleTypes.STRING.schema())
@@ -211,14 +274,9 @@ object ChangeEventExtensions {
211274
.field("state", relationshipStateSchema(this.before, this.after))
212275
.build()
213276

214-
private fun RelationshipEvent.toConnectValue(schema: Schema): Struct =
277+
internal fun RelationshipEvent.toConnectValue(schema: Schema): Struct =
215278
Struct(schema).also {
216-
val keys =
217-
if (this.keys.isEmpty()) {
218-
null
219-
} else {
220-
DynamicTypes.valueFor(schema.field("keys").schema(), this.keys)
221-
}
279+
val keys = DynamicTypes.toConnectValue(schema.field("keys").schema(), this.keys)
222280
it.put("elementId", this.elementId)
223281
it.put("eventType", this.eventType.name)
224282
it.put("operation", this.operation.name)
@@ -231,7 +289,23 @@ object ChangeEventExtensions {
231289
relationshipStateValue(schema.field("state").schema(), this.before, this.after))
232290
}
233291

234-
private fun Node.toConnectSchema(): Schema {
292+
@Suppress("UNCHECKED_CAST")
293+
internal fun Struct.toRelationshipEvent(): RelationshipEvent =
294+
getStruct("state").toRelationshipState().let { (before, after) ->
295+
RelationshipEvent(
296+
getString("elementId"),
297+
getString("type"),
298+
getStruct("start").toNode(),
299+
getStruct("end").toNode(),
300+
DynamicTypes.fromConnectValue(
301+
schema().field("keys").schema(), get("keys"), skipNullValuesInMaps = true)
302+
as List<Map<String, Any>>?,
303+
EntityOperation.valueOf(getString("operation")),
304+
before,
305+
after)
306+
}
307+
308+
internal fun Node.toConnectSchema(): Schema {
235309
return SchemaBuilder.struct()
236310
.namespaced("cdc.Node")
237311
.field("elementId", SimpleTypes.STRING.schema())
@@ -240,13 +314,22 @@ object ChangeEventExtensions {
240314
.build()
241315
}
242316

243-
private fun Node.toConnectValue(schema: Schema): Struct =
317+
internal fun Node.toConnectValue(schema: Schema): Struct =
244318
Struct(schema).also {
245319
it.put("elementId", this.elementId)
246320
it.put("labels", this.labels)
247-
it.put("keys", DynamicTypes.valueFor(schema.field("keys").schema(), this.keys))
321+
it.put("keys", DynamicTypes.toConnectValue(schema.field("keys").schema(), this.keys))
248322
}
249323

324+
@Suppress("UNCHECKED_CAST")
325+
internal fun Struct.toNode(): Node =
326+
Node(
327+
this.getString("elementId"),
328+
this.getArray("labels"),
329+
DynamicTypes.fromConnectValue(
330+
schema().field("keys").schema(), this.get("keys"), skipNullValuesInMaps = true)
331+
as Map<String, List<Map<String, Any>>>)
332+
250333
private fun relationshipStateSchema(
251334
before: RelationshipState?,
252335
after: RelationshipState?
@@ -265,7 +348,7 @@ object ChangeEventExtensions {
265348
(before?.properties ?: mapOf()) + (after?.properties ?: mapOf())
266349
combinedProperties.toSortedMap().forEach { entry ->
267350
if (it.field(entry.key) == null) {
268-
it.field(entry.key, DynamicTypes.schemaFor(entry.value, true))
351+
it.field(entry.key, DynamicTypes.toConnectSchema(entry.value, true))
269352
}
270353
}
271354
}
@@ -293,7 +376,7 @@ object ChangeEventExtensions {
293376
Struct(this.schema().field("before").schema()).also {
294377
it.put(
295378
"properties",
296-
DynamicTypes.valueFor(
379+
DynamicTypes.toConnectValue(
297380
it.schema().field("properties").schema(), before.properties))
298381
})
299382
}
@@ -304,27 +387,48 @@ object ChangeEventExtensions {
304387
Struct(this.schema().field("after").schema()).also {
305388
it.put(
306389
"properties",
307-
DynamicTypes.valueFor(
390+
DynamicTypes.toConnectValue(
308391
it.schema().field("properties").schema(), after.properties))
309392
})
310393
}
311394
}
312395

313-
private fun schemaForKeysByLabel(keys: Map<String, List<Map<String, Any>>>): Schema {
396+
@Suppress("UNCHECKED_CAST")
397+
internal fun Struct.toRelationshipState(): Pair<RelationshipState?, RelationshipState?> =
398+
Pair(
399+
getStruct("before")?.let {
400+
val properties = it.getStruct("properties")
401+
RelationshipState(
402+
DynamicTypes.fromConnectValue(properties.schema(), properties, true)
403+
as Map<String, Any?>)
404+
},
405+
getStruct("after")?.let {
406+
val properties = it.getStruct("properties")
407+
RelationshipState(
408+
DynamicTypes.fromConnectValue(properties.schema(), properties, true)
409+
as Map<String, Any?>)
410+
})
411+
412+
private fun schemaForKeysByLabel(keys: Map<String, List<Map<String, Any>>>?): Schema {
314413
return SchemaBuilder.struct()
315-
.apply { keys.forEach { field(it.key, schemaForKeys(it.value)) } }
414+
.apply { keys?.forEach { field(it.key, schemaForKeys(it.value)) } }
316415
.optional()
317416
.build()
318417
}
319418

320-
private fun schemaForKeys(keys: List<Map<String, Any>>): Schema {
419+
private fun schemaForKeys(keys: List<Map<String, Any>>?): Schema {
321420
return SchemaBuilder.array(
322421
// We need to define a uniform structure of key array elements. Because all elements
323422
// must have identical structure, we list all available keys as optional fields.
324423
SchemaBuilder.struct()
325424
.apply {
326-
keys.forEach { key ->
327-
key.forEach { field(it.key, DynamicTypes.schemaFor(it.value, true)) }
425+
keys?.forEach { key ->
426+
key.forEach {
427+
field(
428+
it.key,
429+
DynamicTypes.toConnectSchema(
430+
it.value, optional = true, forceMapsAsStruct = true))
431+
}
328432
}
329433
}
330434
.optional()

0 commit comments

Comments
 (0)