Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ object DynamicTypes {
.build()
is Collection<*> -> {
val elementTypes = value.map { it?.javaClass?.kotlin }.toSet()
if (elementTypes.isEmpty()) {
return PropertyType.schema
}

val elementType = elementTypes.singleOrNull()
if (elementType != null && isSimplePropertyType(elementType)) {
return PropertyType.schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,34 @@ object PropertyType {
internal const val POINT = "SP"
internal const val POINT_LIST = "LSP"

private val SIMPLE_TYPE_FIELDS =
listOf(
BOOLEAN,
LONG,
FLOAT,
STRING,
BYTES,
LOCAL_DATE,
LOCAL_DATE_TIME,
LOCAL_TIME,
ZONED_DATE_TIME,
OFFSET_TIME,
DURATION,
POINT)
private val LIST_TYPE_FIELDS =
listOf(
BOOLEAN_LIST,
LONG_LIST,
FLOAT_LIST,
STRING_LIST,
LOCAL_DATE_LIST,
LOCAL_DATE_TIME_LIST,
LOCAL_TIME_LIST,
ZONED_DATE_TIME_LIST,
OFFSET_TIME_LIST,
DURATION_LIST,
POINT_LIST)

internal val durationSchema: Schema =
SchemaBuilder(Schema.Type.STRUCT)
.field(MONTHS, Schema.INT64_SCHEMA)
Expand Down Expand Up @@ -175,6 +203,10 @@ object PropertyType {
is Array<*> -> asList(value.toList(), value::class.java.componentType.kotlin)
is Iterable<*> -> {
val elementTypes = value.map { it?.javaClass?.kotlin }.toSet()
if (elementTypes.isEmpty()) {
return asList(value, Int::class)
}

val elementType = elementTypes.singleOrNull()
if (elementType != null) {
return asList(value, elementType)
Expand Down Expand Up @@ -275,50 +307,73 @@ object PropertyType {

fun fromConnectValue(value: Struct?): Any? {
return value?.let {
for (f in it.schema().fields()) {
val fieldValue = it.getWithoutDefault(f.name())
// not set list fields are returned back as empty lists, so we are looking for a non-empty
// field here
if (fieldValue == null || (fieldValue is Collection<*> && fieldValue.isEmpty())) {
continue
}

return when (f.name()) {
BOOLEAN -> it.get(f) as Boolean
BOOLEAN_LIST -> it.get(f) as List<*>
LONG -> it.get(f) as Long
LONG_LIST -> it.get(f) as List<*>
FLOAT -> it.get(f) as Double
FLOAT_LIST -> it.get(f) as List<*>
STRING -> it.get(f) as String
STRING_LIST -> it.get(f) as List<*>
val simpleFieldAndValue =
SIMPLE_TYPE_FIELDS.firstNotNullOfOrNull { f ->
val fieldValue = it.getWithoutDefault(f)
if (fieldValue != null) Pair(f, fieldValue) else null
}
if (simpleFieldAndValue != null) {
return when (simpleFieldAndValue.first) {
BOOLEAN -> simpleFieldAndValue.second as Boolean
LONG -> simpleFieldAndValue.second as Long
FLOAT -> simpleFieldAndValue.second as Double
STRING -> simpleFieldAndValue.second as String
BYTES ->
when (val bytes = it.get(f)) {
when (val bytes = simpleFieldAndValue.second) {
is ByteArray -> bytes
is ByteBuffer -> bytes.array()
else ->
throw IllegalArgumentException(
"unsupported BYTES value: ${bytes.javaClass.name}")
}
LOCAL_DATE -> parseLocalDate((it.get(f) as String))
LOCAL_DATE_LIST -> (it.get(f) as List<String>).map { s -> parseLocalDate(s) }
LOCAL_TIME -> parseLocalTime((it.get(f) as String))
LOCAL_TIME_LIST -> (it.get(f) as List<String>).map { s -> parseLocalTime(s) }
LOCAL_DATE_TIME -> parseLocalDateTime((it.get(f) as String))
LOCAL_DATE_TIME_LIST -> (it.get(f) as List<String>).map { s -> parseLocalDateTime(s) }
ZONED_DATE_TIME -> parseZonedDateTime((it.get(f) as String))
ZONED_DATE_TIME_LIST -> (it.get(f) as List<String>).map { s -> parseZonedDateTime(s) }
OFFSET_TIME -> parseOffsetTime((it.get(f) as String))
OFFSET_TIME_LIST -> (it.get(f) as List<String>).map { s -> parseOffsetTime(s) }
DURATION -> toDuration((it.get(f) as Struct))
DURATION_LIST -> (it.get(f) as List<Struct>).map { s -> toDuration(s) }
POINT -> toPoint((it.get(f) as Struct))
POINT_LIST -> (it.get(f) as List<Struct>).map { s -> toPoint(s) }
else -> throw IllegalArgumentException("unsupported neo4j type: ${f.name()}")
LOCAL_DATE -> parseLocalDate((simpleFieldAndValue.second as String))
LOCAL_TIME -> parseLocalTime((simpleFieldAndValue.second as String))
LOCAL_DATE_TIME -> parseLocalDateTime((simpleFieldAndValue.second as String))
ZONED_DATE_TIME -> parseZonedDateTime((simpleFieldAndValue.second as String))
OFFSET_TIME -> parseOffsetTime((simpleFieldAndValue.second as String))
DURATION -> toDuration((simpleFieldAndValue.second as Struct))
POINT -> toPoint((simpleFieldAndValue.second as Struct))
else ->
throw IllegalArgumentException(
"unsupported simple type: ${simpleFieldAndValue.first}: ${simpleFieldAndValue.second.javaClass.name}")
}
}

val listFieldAndValue =
LIST_TYPE_FIELDS.firstNotNullOfOrNull { f ->
val fieldValue = it.getWithoutDefault(f)
if (fieldValue != null && (fieldValue is Collection<*> && fieldValue.isNotEmpty()))
Pair(f, fieldValue)
else null
}
if (listFieldAndValue != null) {
return when (listFieldAndValue.first) {
BOOLEAN_LIST -> listFieldAndValue.second as List<*>
LONG_LIST -> listFieldAndValue.second as List<*>
FLOAT_LIST -> listFieldAndValue.second as List<*>
STRING_LIST -> listFieldAndValue.second as List<*>
LOCAL_DATE_LIST ->
(listFieldAndValue.second as List<String>).map { s -> parseLocalDate(s) }
LOCAL_TIME_LIST ->
(listFieldAndValue.second as List<String>).map { s -> parseLocalTime(s) }
LOCAL_DATE_TIME_LIST ->
(listFieldAndValue.second as List<String>).map { s -> parseLocalDateTime(s) }
ZONED_DATE_TIME_LIST ->
(listFieldAndValue.second as List<String>).map { s -> parseZonedDateTime(s) }
OFFSET_TIME_LIST ->
(listFieldAndValue.second as List<String>).map { s -> parseOffsetTime(s) }
DURATION_LIST -> (listFieldAndValue.second as List<Struct>).map { s -> toDuration(s) }
POINT_LIST -> (listFieldAndValue.second as List<Struct>).map { s -> toPoint(s) }
else ->
throw IllegalArgumentException(
"unsupported list type: ${listFieldAndValue.first}: ${listFieldAndValue.second.javaClass.name}")
}
}

return null
// Protobuf does not support NULLs in repeated fields, so we always receive LIST typed fields
// as empty lists. If we could not find a simple field and also a non-empty list field, we
// assume the value is an empty list.
return emptyList<Any>()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.kotest.assertions.withClue
import io.kotest.matchers.shouldBe
import io.kotest.matchers.throwable.shouldHaveMessage
import java.nio.ByteBuffer
import java.time.Duration
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
Expand All @@ -45,6 +46,7 @@ import org.neo4j.connectors.kafka.data.PropertyType.LOCAL_DATE
import org.neo4j.driver.Value
import org.neo4j.driver.Values
import org.neo4j.driver.types.Node
import org.neo4j.driver.types.Point
import org.neo4j.driver.types.Relationship

class DynamicTypesTest {
Expand Down Expand Up @@ -115,44 +117,55 @@ class DynamicTypesTest {
Arguments.of("bool array", BooleanArray(1) { true }, null),
Arguments.of("array (bool)", Array(1) { true }, null),
Arguments.of("list (bool)", listOf(true), null),
Arguments.of("empty list (bool)", emptyList<Boolean>(), null),
Arguments.of("short array (empty)", ShortArray(0), null),
Arguments.of("short array", ShortArray(1) { 1.toShort() }, listOf(1L)),
Arguments.of("array (short)", Array(1) { 1.toShort() }, listOf(1L)),
Arguments.of("list (short)", listOf(1.toShort()), listOf(1L)),
Arguments.of("empty list (short)", emptyList<Short>(), null),
Arguments.of("int array (empty)", IntArray(0), null),
Arguments.of("int array", IntArray(1) { 1 }, listOf(1L)),
Arguments.of("array (int)", Array(1) { 1 }, listOf(1L)),
Arguments.of("list (int)", listOf(1), listOf(1L)),
Arguments.of("empty list (int)", emptyList<Int>(), null),
Arguments.of("long array (empty)", LongArray(0), null),
Arguments.of("long array", LongArray(1) { 1.toLong() }, null),
Arguments.of("array (long)", Array(1) { 1.toLong() }, null),
Arguments.of("list (long)", listOf(1L), null),
Arguments.of("empty list (long)", emptyList<Long>(), null),
Arguments.of("float array (empty)", FloatArray(0), null),
Arguments.of("float array", FloatArray(1) { 1.toFloat() }, listOf(1.toDouble())),
Arguments.of("array (float)", Array(1) { 1.toFloat() }, null),
Arguments.of("list (float)", listOf(1.toFloat()), null),
Arguments.of("empty list (float)", emptyList<Float>(), null),
Arguments.of("double array (empty)", DoubleArray(0), null),
Arguments.of("double array", DoubleArray(1) { 1.toDouble() }, null),
Arguments.of("array (double)", Array(1) { 1.toDouble() }, null),
Arguments.of("list (double)", listOf(1.toDouble()), null),
Arguments.of("empty list (double)", emptyList<Double>(), null),
Arguments.of("array (string)", Array(1) { "a" }, null),
Arguments.of("list (string)", listOf("a"), null),
Arguments.of("empty list (string)", emptyList<String>(), null),
Arguments.of("array (local date)", Array(1) { LocalDate.of(1999, 12, 31) }, null),
Arguments.of("list (local date)", listOf(LocalDate.of(1999, 12, 31)), null),
Arguments.of("empty list (local date)", emptyList<LocalDate>(), null),
Arguments.of("array (local time)", Array(1) { LocalTime.of(23, 59, 59) }, null),
Arguments.of("list (local time)", listOf(LocalTime.of(23, 59, 59)), null),
Arguments.of("empty list (local time)", emptyList<LocalTime>(), null),
Arguments.of(
"array (local date time)",
Array(1) { LocalDateTime.of(1999, 12, 31, 23, 59, 59) },
null),
Arguments.of(
"list (local date time)", listOf(LocalDateTime.of(1999, 12, 31, 23, 59, 59)), null),
Arguments.of("empty list (local date time)", emptyList<LocalDateTime>(), null),
Arguments.of(
"array (offset time)",
Array(1) { OffsetTime.of(23, 59, 59, 0, ZoneOffset.UTC) },
null),
Arguments.of(
"list (offset time)", listOf(OffsetTime.of(23, 59, 59, 0, ZoneOffset.UTC)), null),
Arguments.of("empty list (offset time)", emptyList<OffsetTime>(), null),
Arguments.of(
"array (offset date time)",
Array(1) { OffsetDateTime.of(1999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC) },
Expand All @@ -161,6 +174,7 @@ class DynamicTypesTest {
"list (offset date time)",
listOf(OffsetDateTime.of(1999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC)),
null),
Arguments.of("empty list (offset date time)", emptyList<OffsetDateTime>(), null),
Arguments.of(
"array (zoned date time)",
Array(1) {
Expand All @@ -171,6 +185,7 @@ class DynamicTypesTest {
"list (zoned date time)",
listOf(ZonedDateTime.of(1999, 12, 31, 23, 59, 59, 0, ZoneId.of("Europe/London"))),
null),
Arguments.of("empty list (zoned date time)", emptyList<ZonedDateTime>(), null),
Arguments.of(
"array (duration)",
Array(1) { Values.isoDuration(12, 12, 59, 1230).asIsoDuration() },
Expand All @@ -179,13 +194,15 @@ class DynamicTypesTest {
"list (duration)",
listOf(Values.isoDuration(12, 12, 59, 1230).asIsoDuration()),
null),
Arguments.of("empty list (duration)", emptyList<Duration>(), null),
Arguments.of(
"array (point (2d))", Array(1) { Values.point(4326, 1.0, 2.0).asPoint() }, null),
Arguments.of("list (point (2d))", listOf(Values.point(4326, 1.0, 2.0).asPoint()), null),
Arguments.of(
"array (point (3d))", Array(1) { Values.point(4326, 1.0, 2.0, 3.0).asPoint() }, null),
Arguments.of(
"list (point (3d))", listOf(Values.point(4326, 1.0, 2.0, 3.0).asPoint()), null),
Arguments.of("empty list (point)", emptyList<Point>(), null),
)
}
}
Expand Down Expand Up @@ -239,17 +256,39 @@ class DynamicTypesTest {
}

@Test
fun `empty collections or arrays should map to an array of property type`() {
listOf(listOf<Any>(), setOf<Any>(), arrayOf<Any>()).forEach { collection ->
fun `empty collections should map to property type`() {
listOf(listOf<Any>(), setOf<Any>()).forEach { collection ->
withClue(collection) {
DynamicTypes.toConnectSchema(collection, false) shouldBe
SchemaBuilder.array(PropertyType.schema).build()
DynamicTypes.toConnectSchema(collection, true) shouldBe
SchemaBuilder.array(PropertyType.schema).optional().build()
DynamicTypes.toConnectSchema(collection, false) shouldBe PropertyType.schema
DynamicTypes.toConnectSchema(collection, true) shouldBe PropertyType.schema
}
}
}

@Test
fun `empty arrays should map to an array of property type`() {
DynamicTypes.toConnectSchema(arrayOf<Any>(), false) shouldBe
SchemaBuilder.array(PropertyType.schema).build()
DynamicTypes.toConnectSchema(arrayOf<Any>(), true) shouldBe
SchemaBuilder.array(PropertyType.schema).optional().build()
}

@Test
fun `empty arrays of simple types should map to property type`() {
listOf(
arrayOf<Int>(),
arrayOf<String>(),
arrayOf<LocalDate>(),
arrayOf<Boolean>(),
arrayOf<Point>())
.forEach { array ->
withClue(array) {
DynamicTypes.toConnectSchema(array, false) shouldBe PropertyType.schema
DynamicTypes.toConnectSchema(array, true) shouldBe PropertyType.schema
}
}
}

@ParameterizedTest(name = "{0}")
@ArgumentsSource(PropertyTypedCollectionProvider::class)
fun `collections with elements of property types should map to an array schema`(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,16 @@ class PropertyTypeTest {
.put(Z, 3.0)
.put(DIMENSION, THREE_D))),
listOf(Values.point(4326, 1.0, 2.0, 3.0).asPoint())),
Arguments.of(
"empty list (any)",
emptyList<Any>(),
Struct(PropertyType.schema).put(LONG_LIST, emptyList<Long>()),
emptyList<Any>()),
Arguments.of(
"empty list (typed)",
emptyList<Int>(),
Struct(PropertyType.schema).put(LONG_LIST, emptyList<Long>()),
emptyList<Any>()),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ class TypesTest {
.put("x", 12.78)
.put("y", 56.7)
.put("z", 100.0))),
Arguments.of(
Named.of("list - empty", emptyList<Any>()),
PropertyType.schema,
Struct(PropertyType.schema).put(LONG_LIST, emptyList<Long>())),
Arguments.of(
Named.of("list - long", (1L..50L).toList()),
PropertyType.schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.junit.jupiter.api.Test
import org.neo4j.cdc.client.model.ChangeEvent
import org.neo4j.connectors.kafka.connect.ConnectHeader
import org.neo4j.connectors.kafka.data.Headers
import org.neo4j.connectors.kafka.data.PropertyType.schema
import org.neo4j.connectors.kafka.testing.assertions.TopicVerifier
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.AVRO
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_SCHEMA
Expand Down
Loading