Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object ChangeEventExtensions {
.field("eventType", SimpleTypes.STRING.schema())
.field("operation", SimpleTypes.STRING.schema())
.field("labels", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
.field("keys", schemaForKeys(this.keys))
.field("keys", schemaForKeysByLabel(this.keys))
.field(
"state",
SchemaBuilder.struct()
Expand Down Expand Up @@ -154,7 +154,7 @@ object ChangeEventExtensions {
.field("type", SimpleTypes.STRING.schema())
.field("start", this.start.toConnectSchema())
.field("end", this.end.toConnectSchema())
.field("key", schemaForKey(this.key))
.field("keys", schemaForKeys(this.keys))
.field(
"state",
SchemaBuilder.struct()
Expand All @@ -172,7 +172,7 @@ object ChangeEventExtensions {
it.put("type", this.type)
it.put("start", this.start.toConnectValue(schema.field("start").schema()))
it.put("end", this.end.toConnectValue(schema.field("end").schema()))
it.put("key", DynamicTypes.valueFor(schema.field("key").schema(), this.key))
it.put("keys", DynamicTypes.valueFor(schema.field("keys").schema(), this.keys))
it.put(
"state",
schema.field("state").schema().let { stateSchema ->
Expand Down Expand Up @@ -225,7 +225,7 @@ object ChangeEventExtensions {
.namespaced("cdc.Node")
.field("elementId", SimpleTypes.STRING.schema())
.field("labels", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
.field("keys", DynamicTypes.schemaFor(this.keys, true))
.field("keys", schemaForKeysByLabel(this.keys))
.build()
}

Expand Down Expand Up @@ -271,16 +271,25 @@ object ChangeEventExtensions {
}
}

private fun schemaForKeys(keys: Map<String, Map<String, Any>>): Schema {
private fun schemaForKeysByLabel(keys: Map<String, List<Map<String, Any>>>): Schema {
return SchemaBuilder.struct()
.apply { keys.forEach { field(it.key, schemaForKey(it.value)) } }
.apply { keys.forEach { field(it.key, schemaForKeys(it.value)) } }
.optional()
.build()
}

private fun schemaForKey(key: Map<String, Any>): Schema {
return SchemaBuilder.struct()
.apply { key.forEach { field(it.key, DynamicTypes.schemaFor(it.value, true)) } }
private fun schemaForKeys(keys: List<Map<String, Any>>): Schema {
return SchemaBuilder.array(
// We need to define a uniform structure of key array elements. Because all elements
// must have identical structure, we list all available keys as optional fields.
SchemaBuilder.struct()
.apply {
keys.forEach { key ->
key.forEach { field(it.key, DynamicTypes.schemaFor(it.value, true)) }
}
}
.optional()
.build())
Comment on lines +285 to +292
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a bit confused about having keys as an array of structs, probably worth commenting on this.

Copy link
Contributor Author

@venikkin venikkin Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Before the change, each key, e.g. Map<String, Object> was mapped to a struct (struct of struct if we talk about about key per label). Now we have multiple keys, so I just wrapped the struct representing a key with an array. However, to reflect that each element if the array can have different structure, we have to define all possible key name in struct. For example: [ { "id": 1 }, { "name": "Joe" } ] corresponds to

array(struct { 
  optional int id,
  optional string name 
  })

Optional because id is null in the second element and name is null in the first element. For Avro value will like like (preudo-Avro here, just to show the structure): [{id: 1, name: null}, {id: null, name: "Joe"}].
Alternative option would be something like

  array(struct {
     string key, 
     optional int intValue,
     optional string stringValue
  })

with corresponding value [{"key": "id", "intValue": 1, "stringValue": null}, {"key": "name", "intValue": null, "stringValue": "Joe"}]
Maybe there could be a better option. Let's discuss.

.optional()
.build()
}
Expand Down
Loading