@@ -31,9 +31,10 @@ import org.neo4j.cdc.client.model.NodeEvent
3131import org.neo4j.cdc.client.model.NodeState
3232import org.neo4j.cdc.client.model.RelationshipEvent
3333import org.neo4j.cdc.client.model.RelationshipState
34+ import org.neo4j.connectors.kafka.configuration.PayloadMode
3435import org.neo4j.connectors.kafka.data.DynamicTypes.toConnectSchema
3536
36- class ChangeEventConverter () {
37+ class ChangeEventConverter (private val payloadMode : PayloadMode = PayloadMode . EXTENDED ) {
3738
3839 fun toConnectValue (changeEvent : ChangeEvent ): SchemaAndValue {
3940 val schema = toConnectSchema(changeEvent)
@@ -69,15 +70,22 @@ class ChangeEventConverter() {
6970 .field(" connectionServer" , Schema .OPTIONAL_STRING_SCHEMA )
7071 .field(" serverId" , Schema .STRING_SCHEMA )
7172 .field(" captureMode" , Schema .STRING_SCHEMA )
72- .field(" txStartTime" , PropertyType .schema)
73- .field(" txCommitTime" , PropertyType .schema)
73+ .field(
74+ " txStartTime" ,
75+ if (payloadMode == PayloadMode .EXTENDED ) PropertyType .schema
76+ else SimpleTypes .ZONEDDATETIME .schema)
77+ .field(
78+ " txCommitTime" ,
79+ if (payloadMode == PayloadMode .EXTENDED ) PropertyType .schema
80+ else SimpleTypes .ZONEDDATETIME .schema)
7481 .field(
7582 " txMetadata" ,
76- toConnectSchema(metadata.txMetadata, optional = true , forceMapsAsStruct = true )
83+ toConnectSchema(
84+ payloadMode, metadata.txMetadata, optional = true , forceMapsAsStruct = true )
7785 .schema())
7886 .also {
7987 metadata.additionalEntries.forEach { entry ->
80- it.field(entry.key, toConnectSchema(entry.value, optional = true ))
88+ it.field(entry.key, toConnectSchema(payloadMode, entry.value, optional = true ))
8189 }
8290 }
8391 .build()
@@ -92,9 +100,17 @@ class ChangeEventConverter() {
92100 it.put(" serverId" , metadata.serverId)
93101 it.put(" captureMode" , metadata.captureMode.name)
94102 it.put(
95- " txStartTime" , DynamicTypes .toConnectValue(PropertyType .schema, metadata.txStartTime))
103+ " txStartTime" ,
104+ DynamicTypes .toConnectValue(
105+ if (payloadMode == PayloadMode .EXTENDED ) PropertyType .schema
106+ else SimpleTypes .ZONEDDATETIME .schema,
107+ metadata.txStartTime))
96108 it.put(
97- " txCommitTime" , DynamicTypes .toConnectValue(PropertyType .schema, metadata.txCommitTime))
109+ " txCommitTime" ,
110+ DynamicTypes .toConnectValue(
111+ if (payloadMode == PayloadMode .EXTENDED ) PropertyType .schema
112+ else SimpleTypes .ZONEDDATETIME .schema,
113+ metadata.txCommitTime))
98114 it.put(
99115 " txMetadata" ,
100116 DynamicTypes .toConnectValue(schema.field(" txMetadata" ).schema(), metadata.txMetadata))
@@ -214,7 +230,8 @@ class ChangeEventConverter() {
214230 if (addedFields.add(it.key)) {
215231 field(
216232 it.key,
217- toConnectSchema(it.value, optional = true , forceMapsAsStruct = true ))
233+ toConnectSchema(
234+ payloadMode, it.value, optional = true , forceMapsAsStruct = true ))
218235 }
219236 }
220237 }
@@ -232,7 +249,22 @@ class ChangeEventConverter() {
232249 this .field(" labels" , SchemaBuilder .array(Schema .STRING_SCHEMA ).build())
233250 this .field(
234251 " properties" ,
235- SchemaBuilder .map(Schema .STRING_SCHEMA , PropertyType .schema).build())
252+ if (payloadMode == PayloadMode .EXTENDED )
253+ SchemaBuilder .map(Schema .STRING_SCHEMA , PropertyType .schema).build()
254+ else
255+ SchemaBuilder .struct()
256+ .also {
257+ val combinedProperties =
258+ (before?.properties ? : mapOf ()) + (after?.properties ? : mapOf ())
259+ combinedProperties.toSortedMap().forEach { entry ->
260+ if (it.field(entry.key) == null ) {
261+ it.field(
262+ entry.key,
263+ toConnectSchema(payloadMode, entry.value, optional = true ))
264+ }
265+ }
266+ }
267+ .build())
236268 }
237269 .optional()
238270 .build()
@@ -249,9 +281,13 @@ class ChangeEventConverter() {
249281 it.put(" labels" , before.labels)
250282 it.put(
251283 " properties" ,
252- before.properties.mapValues { e ->
253- DynamicTypes .toConnectValue(PropertyType .schema, e.value)
254- })
284+ if (payloadMode == PayloadMode .EXTENDED )
285+ before.properties.mapValues { e ->
286+ DynamicTypes .toConnectValue(PropertyType .schema, e.value)
287+ }
288+ else
289+ DynamicTypes .toConnectValue(
290+ it.schema().field(" properties" ).schema(), before.properties))
255291 })
256292 }
257293
@@ -262,9 +298,13 @@ class ChangeEventConverter() {
262298 it.put(" labels" , after.labels)
263299 it.put(
264300 " properties" ,
265- after.properties.mapValues { e ->
266- DynamicTypes .toConnectValue(PropertyType .schema, e.value)
267- })
301+ if (payloadMode == PayloadMode .EXTENDED )
302+ after.properties.mapValues { e ->
303+ DynamicTypes .toConnectValue(PropertyType .schema, e.value)
304+ }
305+ else
306+ DynamicTypes .toConnectValue(
307+ it.schema().field(" properties" ).schema(), after.properties))
268308 })
269309 }
270310 }
@@ -278,7 +318,22 @@ class ChangeEventConverter() {
278318 .apply {
279319 this .field(
280320 " properties" ,
281- SchemaBuilder .map(Schema .STRING_SCHEMA , PropertyType .schema).build())
321+ if (payloadMode == PayloadMode .EXTENDED )
322+ SchemaBuilder .map(Schema .STRING_SCHEMA , PropertyType .schema).build()
323+ else
324+ SchemaBuilder .struct()
325+ .also {
326+ val combinedProperties =
327+ (before?.properties ? : mapOf ()) + (after?.properties ? : mapOf ())
328+ combinedProperties.toSortedMap().forEach { entry ->
329+ if (it.field(entry.key) == null ) {
330+ it.field(
331+ entry.key,
332+ toConnectSchema(payloadMode, entry.value, optional = true ))
333+ }
334+ }
335+ }
336+ .build())
282337 }
283338 .optional()
284339 .build()
@@ -298,9 +353,13 @@ class ChangeEventConverter() {
298353 Struct (this .schema().field(" before" ).schema()).also {
299354 it.put(
300355 " properties" ,
301- before.properties.mapValues { e ->
302- DynamicTypes .toConnectValue(PropertyType .schema, e.value)
303- })
356+ if (payloadMode == PayloadMode .EXTENDED )
357+ before.properties.mapValues { e ->
358+ DynamicTypes .toConnectValue(PropertyType .schema, e.value)
359+ }
360+ else
361+ DynamicTypes .toConnectValue(
362+ it.schema().field(" properties" ).schema(), before.properties))
304363 })
305364 }
306365
@@ -310,9 +369,13 @@ class ChangeEventConverter() {
310369 Struct (this .schema().field(" after" ).schema()).also {
311370 it.put(
312371 " properties" ,
313- after.properties.mapValues { e ->
314- DynamicTypes .toConnectValue(PropertyType .schema, e.value)
315- })
372+ if (payloadMode == PayloadMode .EXTENDED )
373+ after.properties.mapValues { e ->
374+ DynamicTypes .toConnectValue(PropertyType .schema, e.value)
375+ }
376+ else
377+ DynamicTypes .toConnectValue(
378+ it.schema().field(" properties" ).schema(), after.properties))
316379 })
317380 }
318381 }
@@ -396,44 +459,68 @@ internal fun Struct.toRelationshipEvent(): RelationshipEvent =
396459 after)
397460 }
398461
399- @Suppress(" UNCHECKED_CAST" )
462+ @Suppress(" UNCHECKED_CAST" , " IMPLICIT_CAST_TO_ANY " )
400463internal fun Struct.toNodeState (): Pair <NodeState ?, NodeState ?> =
401464 Pair (
402465 getStruct(" before" )?.let {
403466 val labels = it.getArray<String >(" labels" )
404- val properties = it.getMap<String , Any ?>(" properties" )
467+ val propertiesField = it.schema().field(" properties" )
468+ val properties =
469+ when (propertiesField.schema().type()) {
470+ Schema .Type .MAP -> it.getMap<String , Any ?>(" properties" )
471+ Schema .Type .STRUCT -> it.getStruct(" properties" )
472+ else -> throw IllegalArgumentException (" Unsupported schema type for properties" )
473+ }
405474 NodeState (
406475 labels,
407- DynamicTypes .fromConnectValue(
408- it.schema().field( " properties " ).schema(), properties, true ) as Map <String , Any ?>,
476+ DynamicTypes .fromConnectValue(propertiesField.schema(), properties, true )
477+ as Map <String , Any ?>,
409478 )
410479 },
411480 getStruct(" after" )?.let {
412481 val labels = it.getArray<String >(" labels" )
413- val properties = it.getMap<String , Any ?>(" properties" )
482+ val propertiesField = it.schema().field(" properties" )
483+ val properties =
484+ when (propertiesField.schema().type()) {
485+ Schema .Type .MAP -> it.getMap<String , Any ?>(" properties" )
486+ Schema .Type .STRUCT -> it.getStruct(" properties" )
487+ else -> throw IllegalArgumentException (" Unsupported schema type for properties" )
488+ }
414489 NodeState (
415490 labels,
416- DynamicTypes .fromConnectValue(
417- it.schema().field( " properties " ).schema(), properties, true ) as Map <String , Any ?>,
491+ DynamicTypes .fromConnectValue(propertiesField.schema(), properties, true )
492+ as Map <String , Any ?>,
418493 )
419494 },
420495 )
421496
422- @Suppress(" UNCHECKED_CAST" )
497+ @Suppress(" UNCHECKED_CAST" , " IMPLICIT_CAST_TO_ANY " )
423498internal fun Struct.toRelationshipState (): Pair <RelationshipState ?, RelationshipState ?> =
424499 Pair (
425500 getStruct(" before" )?.let {
426- val properties = it.getMap<String , Any ?>(" properties" )
501+ val propertiesField = it.schema().field(" properties" )
502+ val properties =
503+ when (propertiesField.schema().type()) {
504+ Schema .Type .MAP -> it.getMap<String , Any ?>(" properties" )
505+ Schema .Type .STRUCT -> it.getStruct(" properties" )
506+ else -> throw IllegalArgumentException (" Unsupported schema type for properties" )
507+ }
427508 RelationshipState (
428- DynamicTypes .fromConnectValue(
429- it.schema().field( " properties " ).schema(), properties, true ) as Map <String , Any ?>,
509+ DynamicTypes .fromConnectValue(propertiesField.schema(), properties, true )
510+ as Map <String , Any ?>,
430511 )
431512 },
432513 getStruct(" after" )?.let {
433- val properties = it.getMap<String , Any ?>(" properties" )
514+ val propertiesField = it.schema().field(" properties" )
515+ val properties =
516+ when (propertiesField.schema().type()) {
517+ Schema .Type .MAP -> it.getMap<String , Any ?>(" properties" )
518+ Schema .Type .STRUCT -> it.getStruct(" properties" )
519+ else -> throw IllegalArgumentException (" Unsupported schema type for properties" )
520+ }
434521 RelationshipState (
435- DynamicTypes .fromConnectValue(
436- it.schema().field( " properties " ).schema(), properties, true ) as Map <String , Any ?>,
522+ DynamicTypes .fromConnectValue(propertiesField.schema(), properties, true )
523+ as Map <String , Any ?>,
437524 )
438525 },
439526 )
0 commit comments