Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import org.neo4j.cdc.client.model.RelationshipEvent
import org.neo4j.cdc.client.model.RelationshipState
import org.neo4j.connectors.kafka.data.DynamicTypes.toConnectSchema

class ChangeEventConverter(
val temporalDataSchemaType: TemporalDataSchemaType = TemporalDataSchemaType.STRUCT,
) {
class ChangeEventConverter() {

fun toConnectValue(changeEvent: ChangeEvent): SchemaAndValue {
val schema = toConnectSchema(changeEvent)
Expand All @@ -44,9 +42,9 @@ class ChangeEventConverter(

private fun toConnectSchema(changeEvent: ChangeEvent): Schema =
SchemaBuilder.struct()
.field("id", SimpleTypes.STRING.schema())
.field("txId", SimpleTypes.LONG.schema())
.field("seq", SimpleTypes.LONG.schema())
.field("id", Schema.STRING_SCHEMA)
.field("txId", Schema.INT64_SCHEMA)
.field("seq", Schema.INT64_SCHEMA)
.field("metadata", metadataToConnectSchema(changeEvent.metadata))
.field("event", eventToConnectSchema(changeEvent.event))
.build()
Expand All @@ -64,31 +62,22 @@ class ChangeEventConverter(

internal fun metadataToConnectSchema(metadata: Metadata): Schema =
SchemaBuilder.struct()
.field("authenticatedUser", SimpleTypes.STRING.schema())
.field("executingUser", SimpleTypes.STRING.schema())
.field("connectionType", SimpleTypes.STRING.schema(true))
.field("connectionClient", SimpleTypes.STRING.schema(true))
.field("connectionServer", SimpleTypes.STRING.schema(true))
.field("serverId", SimpleTypes.STRING.schema())
.field("captureMode", SimpleTypes.STRING.schema())
.field("txStartTime", SimpleTypes.ZONEDDATETIME_STRUCT.schema())
.field("txCommitTime", SimpleTypes.ZONEDDATETIME_STRUCT.schema())
.field("authenticatedUser", Schema.STRING_SCHEMA)
.field("executingUser", Schema.STRING_SCHEMA)
.field("connectionType", Schema.OPTIONAL_STRING_SCHEMA)
.field("connectionClient", Schema.OPTIONAL_STRING_SCHEMA)
.field("connectionServer", Schema.OPTIONAL_STRING_SCHEMA)
.field("serverId", Schema.STRING_SCHEMA)
.field("captureMode", Schema.STRING_SCHEMA)
.field("txStartTime", PropertyType.schema)
.field("txCommitTime", PropertyType.schema)
.field(
"txMetadata",
toConnectSchema(
metadata.txMetadata,
optional = true,
forceMapsAsStruct = true,
temporalDataSchemaType = temporalDataSchemaType)
toConnectSchema(metadata.txMetadata, optional = true, forceMapsAsStruct = true)
.schema())
.also {
metadata.additionalEntries.forEach { entry ->
it.field(
entry.key,
toConnectSchema(
entry.value,
optional = true,
temporalDataSchemaType = temporalDataSchemaType))
it.field(entry.key, toConnectSchema(entry.value, optional = true))
}
}
.build()
Expand All @@ -103,13 +92,9 @@ class ChangeEventConverter(
it.put("serverId", metadata.serverId)
it.put("captureMode", metadata.captureMode.name)
it.put(
"txStartTime",
DynamicTypes.toConnectValue(
SimpleTypes.ZONEDDATETIME_STRUCT.schema(), metadata.txStartTime))
"txStartTime", DynamicTypes.toConnectValue(PropertyType.schema, metadata.txStartTime))
it.put(
"txCommitTime",
DynamicTypes.toConnectValue(
SimpleTypes.ZONEDDATETIME_STRUCT.schema(), metadata.txCommitTime))
"txCommitTime", DynamicTypes.toConnectValue(PropertyType.schema, metadata.txCommitTime))
it.put(
"txMetadata",
DynamicTypes.toConnectValue(schema.field("txMetadata").schema(), metadata.txMetadata))
Expand Down Expand Up @@ -138,17 +123,18 @@ class ChangeEventConverter(

internal fun nodeEventToConnectSchema(nodeEvent: NodeEvent): Schema =
SchemaBuilder.struct()
.field("elementId", SimpleTypes.STRING.schema())
.field("eventType", SimpleTypes.STRING.schema())
.field("operation", SimpleTypes.STRING.schema())
.field("labels", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
.field("elementId", Schema.STRING_SCHEMA)
.field("eventType", Schema.STRING_SCHEMA)
.field("operation", Schema.STRING_SCHEMA)
.field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("keys", schemaForKeysByLabel(nodeEvent.keys))
.field("state", nodeStateSchema(nodeEvent.before, nodeEvent.after))
.build()

internal fun nodeEventToConnectValue(nodeEvent: NodeEvent, schema: Schema): Struct =
Struct(schema).also {
val keys = DynamicTypes.toConnectValue(schema.field("keys").schema(), nodeEvent.keys)

it.put("elementId", nodeEvent.elementId)
it.put("eventType", nodeEvent.eventType.name)
it.put("operation", nodeEvent.operation.name)
Expand All @@ -161,10 +147,10 @@ class ChangeEventConverter(

internal fun relationshipEventToConnectSchema(relationshipEvent: RelationshipEvent): Schema =
SchemaBuilder.struct()
.field("elementId", SimpleTypes.STRING.schema())
.field("eventType", SimpleTypes.STRING.schema())
.field("operation", SimpleTypes.STRING.schema())
.field("type", SimpleTypes.STRING.schema())
.field("elementId", Schema.STRING_SCHEMA)
.field("eventType", Schema.STRING_SCHEMA)
.field("operation", Schema.STRING_SCHEMA)
.field("type", Schema.STRING_SCHEMA)
.field("start", nodeToConnectSchema(relationshipEvent.start))
.field("end", nodeToConnectSchema(relationshipEvent.end))
.field("keys", schemaForKeys(relationshipEvent.keys))
Expand All @@ -179,6 +165,7 @@ class ChangeEventConverter(
Struct(schema).also {
val keys =
DynamicTypes.toConnectValue(schema.field("keys").schema(), relationshipEvent.keys)

it.put("elementId", relationshipEvent.elementId)
it.put("eventType", relationshipEvent.eventType.name)
it.put("operation", relationshipEvent.operation.name)
Expand All @@ -194,8 +181,8 @@ class ChangeEventConverter(

internal fun nodeToConnectSchema(node: Node): Schema {
return SchemaBuilder.struct()
.field("elementId", SimpleTypes.STRING.schema())
.field("labels", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
.field("elementId", Schema.STRING_SCHEMA)
.field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("keys", schemaForKeysByLabel(node.keys))
.build()
}
Expand Down Expand Up @@ -224,11 +211,8 @@ class ChangeEventConverter(
key.forEach {
field(
it.key,
toConnectSchema(
it.value,
optional = true,
forceMapsAsStruct = true,
temporalDataSchemaType = temporalDataSchemaType))
DynamicTypes.toConnectSchema(
it.value, optional = true, forceMapsAsStruct = true))
}
}
}
Expand All @@ -242,27 +226,10 @@ class ChangeEventConverter(
val stateSchema =
SchemaBuilder.struct()
.apply {
this.field("labels", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
this.field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
this.field(
"properties",
SchemaBuilder.struct()
.also {
// TODO: should we check for incompatible types for the existing value,
// and what happens in that case?
val combinedProperties =
(before?.properties ?: mapOf()) + (after?.properties ?: mapOf())
combinedProperties.toSortedMap().forEach { entry ->
if (it.field(entry.key) == null) {
it.field(
entry.key,
toConnectSchema(
entry.value,
optional = true,
temporalDataSchemaType = temporalDataSchemaType))
}
}
}
.build())
SchemaBuilder.map(Schema.STRING_SCHEMA, PropertyType.schema).build())
}
.optional()
.build()
Expand All @@ -279,8 +246,9 @@ class ChangeEventConverter(
it.put("labels", before.labels)
it.put(
"properties",
DynamicTypes.toConnectValue(
it.schema().field("properties").schema(), before.properties))
before.properties.mapValues { e ->
DynamicTypes.toConnectValue(PropertyType.schema, e.value)
})
})
}

Expand All @@ -291,8 +259,9 @@ class ChangeEventConverter(
it.put("labels", after.labels)
it.put(
"properties",
DynamicTypes.toConnectValue(
it.schema().field("properties").schema(), after.properties))
after.properties.mapValues { e ->
DynamicTypes.toConnectValue(PropertyType.schema, e.value)
})
})
}
}
Expand All @@ -306,24 +275,7 @@ class ChangeEventConverter(
.apply {
this.field(
"properties",
SchemaBuilder.struct()
.also {
// TODO: should we check for incompatible types for the existing value,
// and what happens in that case?
val combinedProperties =
(before?.properties ?: mapOf()) + (after?.properties ?: mapOf())
combinedProperties.toSortedMap().forEach { entry ->
if (it.field(entry.key) == null) {
it.field(
entry.key,
toConnectSchema(
entry.value,
optional = true,
temporalDataSchemaType = temporalDataSchemaType))
}
}
}
.build())
SchemaBuilder.map(Schema.STRING_SCHEMA, PropertyType.schema).build())
}
.optional()
.build()
Expand All @@ -343,8 +295,9 @@ class ChangeEventConverter(
Struct(this.schema().field("before").schema()).also {
it.put(
"properties",
DynamicTypes.toConnectValue(
it.schema().field("properties").schema(), before.properties))
before.properties.mapValues { e ->
DynamicTypes.toConnectValue(PropertyType.schema, e.value)
})
})
}

Expand All @@ -354,8 +307,9 @@ class ChangeEventConverter(
Struct(this.schema().field("after").schema()).also {
it.put(
"properties",
DynamicTypes.toConnectValue(
it.schema().field("properties").schema(), after.properties))
after.properties.mapValues { e ->
DynamicTypes.toConnectValue(PropertyType.schema, e.value)
})
})
}
}
Expand Down Expand Up @@ -424,20 +378,20 @@ internal fun Struct.toNodeState(): Pair<NodeState?, NodeState?> =
Pair(
getStruct("before")?.let {
val labels = it.getArray<String>("labels")
val properties = it.getStruct("properties")
val properties = it.getMap<String, Any?>("properties")
NodeState(
labels,
DynamicTypes.fromConnectValue(properties.schema(), properties, true)
as Map<String, Any?>,
DynamicTypes.fromConnectValue(
it.schema().field("properties").schema(), properties, true) as Map<String, Any?>,
)
},
getStruct("after")?.let {
val labels = it.getArray<String>("labels")
val properties = it.getStruct("properties")
val properties = it.getMap<String, Any?>("properties")
NodeState(
labels,
DynamicTypes.fromConnectValue(properties.schema(), properties, true)
as Map<String, Any?>,
DynamicTypes.fromConnectValue(
it.schema().field("properties").schema(), properties, true) as Map<String, Any?>,
)
},
)
Expand All @@ -446,17 +400,17 @@ internal fun Struct.toNodeState(): Pair<NodeState?, NodeState?> =
internal fun Struct.toRelationshipState(): Pair<RelationshipState?, RelationshipState?> =
Pair(
getStruct("before")?.let {
val properties = it.getStruct("properties")
val properties = it.getMap<String, Any?>("properties")
RelationshipState(
DynamicTypes.fromConnectValue(properties.schema(), properties, true)
as Map<String, Any?>,
DynamicTypes.fromConnectValue(
it.schema().field("properties").schema(), properties, true) as Map<String, Any?>,
)
},
getStruct("after")?.let {
val properties = it.getStruct("properties")
val properties = it.getMap<String, Any?>("properties")
RelationshipState(
DynamicTypes.fromConnectValue(properties.schema(), properties, true)
as Map<String, Any?>,
DynamicTypes.fromConnectValue(
it.schema().field("properties").schema(), properties, true) as Map<String, Any?>,
)
},
)
Expand Down
Loading