Skip to content

Commit f9fc057

Browse files
authored
fix: handle heterogeneous object list (#349)
Fixes #343 This applies only to the source connector, with the query strategy. Note that maps with heterogeneous value types will still be encoded as structs. Some converters (like Protobuf) do not support map message types by default.
1 parent 851ce76 commit f9fc057

File tree

9 files changed

+89
-5
lines changed

9 files changed

+89
-5
lines changed

common/src/main/resources/neo4j-source-configuration.properties

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
neo4j.source-strategy=Type: Enum<QUERY, CDC>;\nDescription: Source strategy for this connector.
1515
neo4j.batch-size=Type: Integer;\nDescription: Maximum number of change events to publish for each poll cycle.
1616
neo4j.start-from=Type: Enum<EARLIEST, NOW, USER_PROVIDED>;\nDescription: A time anchor to start streaming from.
17-
neo4j.start-from.value=Type: STRING|LONG;\nDescription: Custom value to use as a starting offset. \
17+
neo4j.start-from.value=Type: String|Long;\nDescription: Custom value to use as a starting offset. \
1818
Used once during the initial run of the connector, and will be ignored if there is an offset stored in Kafka Connect.
1919
neo4j.ignore-stored-offset=Type: Boolean;\nDescription: Whether to ignore any offset value retrieved from the offset storage saved by a previous run.
2020
neo4j.query=Type: String;\nDescription: Cypher query to use for gathering changes. \
@@ -24,6 +24,8 @@ neo4j.query.streaming-property=Type: String;\nDescription: Property name that is
2424
neo4j.query.poll-duration=Type: String;\nDescription: Maximum amount of time Kafka Connect poll request will wait for a change to be received from the database (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`).
2525
neo4j.query.poll-interval=Type: String;\nDescription: Interval in which the query is executed during `neo4j.query.poll-duration` timespan (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`).
2626
neo4j.query.topic=Type: String;\nDescription: Kafka topic to publish change events gathered through provided query.
27+
neo4j.query.force-maps-as-struct=Type: Boolean;\nDescription: Whether the schema of maps with homogeneous value types is encoded as struct or map (default is: `true`). \
28+
Not all converters support map schemas.
2729
neo4j.query.timeout=Type: Duration;\nDescription: Maximum amount of time query is allowed to run (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`).
2830
neo4j.cdc.poll-duration=Type: Duration;\nDescription: Maximum amount of time Kafka Connect poll request will wait for a change to be received from the database (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`).
2931
neo4j.cdc.poll-interval=Type: Duration;\nDescription: The interval in which the database will be polled for changes (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`).

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jSourceQueryIT.kt

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.neo4j.connectors.kafka.testing.TestSupport.runTest
2727
import org.neo4j.connectors.kafka.testing.assertions.TopicVerifier
2828
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.AVRO
2929
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_EMBEDDED
30+
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_RAW
3031
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_SCHEMA
3132
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.PROTOBUF
3233
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
@@ -39,7 +40,7 @@ import org.neo4j.driver.Session
3940
abstract class Neo4jSourceQueryIT {
4041

4142
companion object {
42-
private const val TOPIC = "neo4j-source-topic"
43+
const val TOPIC = "neo4j-source-topic"
4344
}
4445

4546
@Neo4jSource(
@@ -219,6 +220,54 @@ class Neo4jSourceJsonEmbeddedExtendedIT : Neo4jSourceQueryIT()
219220
@KeyValueConverter(key = JSON_EMBEDDED, value = JSON_EMBEDDED, payloadMode = PayloadMode.COMPACT)
220221
class Neo4jSourceJsonEmbeddedCompactIT : Neo4jSourceQueryIT()
221222

223+
// it doesn't make sense to add EXTENDED mode for JSON_RAW since it's not a schema supporting
224+
// converter
225+
@KeyValueConverter(key = JSON_RAW, value = JSON_RAW, payloadMode = PayloadMode.COMPACT)
226+
class Neo4jSourceJsonRawCompactIT : Neo4jSourceQueryIT() {
227+
228+
@Neo4jSource(
229+
topic = TOPIC,
230+
strategy = SourceStrategy.QUERY,
231+
streamingProperty = "timestamp",
232+
startFrom = "USER_PROVIDED",
233+
startFromValue = "1704067200000", // 2024-01-01T00:00:00
234+
query =
235+
"WITH {id: 'ROOT_ID', list: [{ property1: 'value1' }, { property2: 'value2' }]} AS data RETURN data, data.id AS guid, dateTime().epochMillis AS timestamp")
236+
@Test
237+
fun `serializes list of heterogeneous objects as map by default`(
238+
@TopicConsumer(topic = TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer
239+
) = runTest {
240+
TopicVerifier.createForMap(consumer)
241+
.assertMessageValue { value ->
242+
val list = (value["data"] as Map<*, *>)["list"]
243+
list shouldBe
244+
mapOf("e0" to mapOf("property1" to "value1"), "e1" to mapOf("property2" to "value2"))
245+
}
246+
.verifyWithin(Duration.ofSeconds(30))
247+
}
248+
249+
@Neo4jSource(
250+
topic = TOPIC,
251+
strategy = SourceStrategy.QUERY,
252+
streamingProperty = "timestamp",
253+
startFrom = "USER_PROVIDED",
254+
startFromValue = "1704067200000", // 2024-01-01T00:00:00
255+
forceMapsAsStruct = false,
256+
query =
257+
"WITH {id: 'ROOT_ID', list: [{ property1: 'value1' }, { property2: 'value2' }]} AS data RETURN data, data.id AS guid, dateTime().epochMillis AS timestamp")
258+
@Test
259+
fun `serializes list of heterogeneous objects as list when not forcing structs for map values with homogeneous value types`(
260+
@TopicConsumer(topic = TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer
261+
) = runTest {
262+
TopicVerifier.createForMap(consumer)
263+
.assertMessageValue { value ->
264+
val list = (value["data"] as Map<*, *>)["list"]
265+
list shouldBe listOf(mapOf("property1" to "value1"), mapOf("property2" to "value2"))
266+
}
267+
.verifyWithin(Duration.ofSeconds(30))
268+
}
269+
}
270+
222271
@KeyValueConverter(key = PROTOBUF, value = PROTOBUF, payloadMode = PayloadMode.EXTENDED)
223272
class Neo4jSourceProtobufExtendedIT : Neo4jSourceQueryIT()
224273

source/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jQueryTask.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ class Neo4jQueryTask : SourceTask() {
9797
val recordAsMap = record.asMap()
9898
val schema =
9999
DynamicTypes.toConnectSchema(
100-
config.payloadMode, recordAsMap, optional = true, forceMapsAsStruct = true)
100+
config.payloadMode,
101+
recordAsMap,
102+
optional = true,
103+
forceMapsAsStruct = config.forceMapsAsStruct)
101104
val value = DynamicTypes.toConnectValue(schema, recordAsMap)
102105

103106
return SourceRecord(

source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ enum class StartFrom {
5858
class SourceConfiguration(originals: Map<*, *>) :
5959
Neo4jConfiguration(config(), originals, ConnectorType.SOURCE) {
6060

61+
val forceMapsAsStruct: Boolean
62+
get(): Boolean = getBoolean(QUERY_FORCE_MAPS_AS_STRUCT)
63+
6164
val startFrom
6265
get(): StartFrom = StartFrom.valueOf(getString(START_FROM))
6366

@@ -456,6 +459,7 @@ class SourceConfiguration(originals: Map<*, *>) :
456459
const val QUERY_POLL_DURATION = "neo4j.query.poll-duration"
457460
const val QUERY_TIMEOUT = "neo4j.query.timeout"
458461
const val QUERY_TOPIC = "neo4j.query.topic"
462+
const val QUERY_FORCE_MAPS_AS_STRUCT = "neo4j.query.force-maps-as-struct"
459463
const val CDC_POLL_INTERVAL = "neo4j.cdc.poll-interval"
460464
const val CDC_POLL_DURATION = "neo4j.cdc.poll-duration"
461465
const val PAYLOAD_MODE = "neo4j.payload-mode"
@@ -489,6 +493,8 @@ class SourceConfiguration(originals: Map<*, *>) :
489493
private val DEFAULT_QUERY_POLL_DURATION = 5.seconds
490494
private const val DEFAULT_BATCH_SIZE = 1000
491495
private val DEFAULT_QUERY_TIMEOUT = 0.seconds
496+
private const val DEFAULT_QUERY_FORCE_MAPS_AS_STRUCT = true
497+
492498
private val DEFAULT_CDC_POLL_INTERVAL = 1.seconds
493499
private val DEFAULT_CDC_POLL_DURATION = 5.seconds
494500
private const val DEFAULT_STREAMING_PROPERTY = "timestamp"
@@ -643,6 +649,14 @@ class SourceConfiguration(originals: Map<*, *>) :
643649
Recommenders.visibleIf(STRATEGY, Predicate.isEqual(SourceType.QUERY.name))
644650
validator = Validators.pattern(SIMPLE_DURATION_PATTERN)
645651
})
652+
.define(
653+
ConfigKeyBuilder.of(QUERY_FORCE_MAPS_AS_STRUCT, ConfigDef.Type.BOOLEAN) {
654+
importance = ConfigDef.Importance.LOW
655+
defaultValue = DEFAULT_QUERY_FORCE_MAPS_AS_STRUCT
656+
group = Groups.CONNECTOR_ADVANCED.title
657+
recommender =
658+
Recommenders.visibleIf(STRATEGY, Predicate.isEqual(SourceType.QUERY.name))
659+
})
646660
.define(
647661
ConfigKeyBuilder.of(BATCH_SIZE, ConfigDef.Type.INT) {
648662
importance = ConfigDef.Importance.MEDIUM

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/assertions/TopicVerifier.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,23 @@ class TopicVerifier<K, V>(
159159
topic: String,
160160
value: ByteArray?,
161161
): Any? {
162-
return when (val sourceValue = converter.toConnectData(topic, value).value()) {
162+
val connectData = converter.toConnectData(topic, value)
163+
return when (val sourceValue = connectData.value()) {
163164
is Struct ->
164165
when (assertionClass) {
165166
ChangeEvent::class.java -> sourceValue.toChangeEvent()
166167
Map::class.java ->
167168
DynamicTypes.fromConnectValue(sourceValue.schema(), sourceValue, true)
168169
else -> sourceValue as V
169170
}
171+
is Map<*, *> -> {
172+
val schema = connectData.schema()
173+
if (schema == null) {
174+
sourceValue
175+
} else {
176+
DynamicTypes.fromConnectValue(schema, sourceValue, true)
177+
}
178+
}
170179
else -> sourceValue
171180
}
172181
}
@@ -179,6 +188,7 @@ class TopicVerifier<K, V>(
179188
when (consumer.keyConverter) {
180189
KafkaConverter.JSON_EMBEDDED ->
181190
keyConverter.configure(mapOf("schemas.enable" to true), true)
191+
KafkaConverter.JSON_RAW -> keyConverter.configure(mapOf("schemas.enable" to false), true)
182192
else ->
183193
keyConverter.configure(
184194
mapOf(
@@ -193,6 +203,7 @@ class TopicVerifier<K, V>(
193203
when (consumer.valueConverter) {
194204
KafkaConverter.JSON_EMBEDDED ->
195205
valueConverter.configure(mapOf("schemas.enable" to true), false)
206+
KafkaConverter.JSON_RAW -> valueConverter.configure(mapOf("schemas.enable" to false), false)
196207
else ->
197208
valueConverter.configure(
198209
mapOf(

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/format/KafkaConverter.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ enum class KafkaConverter(
7676
converterProvider = { JsonConverter() },
7777
serializerClass = KafkaJsonSerializer::class.java,
7878
testShimSerializer = JsonRawSerializer,
79+
supportsSchemaRegistry = false,
7980
additionalProperties = JSON_RAW_OPTIONS,
8081
),
8182
PROTOBUF(

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSource.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ annotation class Neo4jSource(
4040
val topic: String = "",
4141
val streamingProperty: String = "",
4242
val query: String = "",
43+
val forceMapsAsStruct: Boolean = true,
4344

4445
// CDC strategy
4546
val cdc: CdcSource = CdcSource()

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtension.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ internal class Neo4jSourceExtension(
184184
startFrom = sourceAnnotation.startFrom,
185185
startFromValue = sourceAnnotation.startFromValue,
186186
query = sourceAnnotation.query,
187+
forceMapsAsStruct = sourceAnnotation.forceMapsAsStruct,
187188
strategy = sourceAnnotation.strategy,
188189
keyConverter = keyValueConverterResolver.resolveKeyConverter(context),
189190
valueConverter = keyValueConverterResolver.resolveValueConverter(context),

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceRegistration.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ internal class Neo4jSourceRegistration(
4949
cdcMetadata: Map<String, List<Map<String, String>>>,
5050
cdcKeySerializations: Map<String, String>,
5151
cdcValueSerializations: Map<String, String>,
52-
payloadMode: PayloadMode
52+
payloadMode: PayloadMode,
53+
forceMapsAsStruct: Boolean
5354
) {
5455

5556
val name: String = randomizedName("Neo4jSourceConnector")
@@ -92,6 +93,7 @@ internal class Neo4jSourceRegistration(
9293
if (strategy == QUERY) {
9394
put("neo4j.query.topic", topic)
9495
put("neo4j.query", query)
96+
put("neo4j.query.force-maps-as-struct", forceMapsAsStruct)
9597
put("neo4j.query.streaming-property", streamingProperty)
9698
put("neo4j.query.poll-interval", "${pollInterval.toMillis()}ms")
9799
put("neo4j.query.poll-duration", "${pollDuration.toMillis()}ms")

0 commit comments

Comments
 (0)