Skip to content

Commit dca9d21

Browse files
authored
feat: support kafka connect built-in types (#261)
1 parent 7ecee27 commit dca9d21

File tree

9 files changed

+678
-155
lines changed

9 files changed

+678
-155
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/data/DynamicTypes.kt

Lines changed: 292 additions & 142 deletions
Large diffs are not rendered by default.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.data
18+
19+
import io.kotest.matchers.shouldBe
20+
import java.math.BigDecimal
21+
import java.math.BigInteger
22+
import java.time.LocalDate
23+
import java.time.LocalDateTime
24+
import java.time.LocalTime
25+
import java.time.OffsetDateTime
26+
import java.time.ZoneOffset
27+
import org.apache.kafka.connect.data.Date
28+
import org.apache.kafka.connect.data.Decimal
29+
import org.apache.kafka.connect.data.Time
30+
import org.apache.kafka.connect.data.Timestamp
31+
import org.junit.jupiter.api.Test
32+
import org.neo4j.connectors.kafka.data.DynamicTypes.MILLIS_PER_DAY
33+
34+
class DynamicTypesTest {
35+
@Test
36+
fun `kafka connect date values should be returned as local date`() {
37+
DynamicTypes.fromConnectValue(
38+
Date.SCHEMA,
39+
java.util.Date(LocalDate.of(2000, 1, 1).toEpochDay() * MILLIS_PER_DAY)) shouldBe
40+
LocalDate.of(2000, 1, 1)
41+
}
42+
43+
@Test
44+
fun `kafka connect time values should be returned as local time`() {
45+
DynamicTypes.fromConnectValue(
46+
Time.SCHEMA,
47+
java.util.Date(
48+
OffsetDateTime.of(
49+
LocalDate.EPOCH, LocalTime.of(23, 59, 59, 999_000_000), ZoneOffset.UTC)
50+
.toInstant()
51+
.toEpochMilli())) shouldBe LocalTime.of(23, 59, 59, 999_000_000)
52+
}
53+
54+
@Test
55+
fun `kafka connect timestamp values should be returned as local time`() {
56+
DynamicTypes.fromConnectValue(
57+
Timestamp.SCHEMA,
58+
java.util.Date(
59+
OffsetDateTime.of(
60+
LocalDate.of(2000, 1, 1), LocalTime.of(23, 59, 59, 999_000_000), ZoneOffset.UTC)
61+
.toInstant()
62+
.toEpochMilli())) shouldBe LocalDateTime.of(2000, 1, 1, 23, 59, 59, 999_000_000)
63+
}
64+
65+
@Test
66+
fun `kafka connect decimal values should be returned as string`() {
67+
DynamicTypes.fromConnectValue(
68+
Decimal.schema(4), BigDecimal(BigInteger("12345678901234567890"), 4)) shouldBe
69+
"1234567890123456.7890"
70+
DynamicTypes.fromConnectValue(
71+
Decimal.schema(6), BigDecimal(BigInteger("12345678901234567890"), 6)) shouldBe
72+
"12345678901234.567890"
73+
DynamicTypes.fromConnectValue(
74+
Decimal.schema(6), BigDecimal(BigInteger("123456000000"), 6)) shouldBe "123456.000000"
75+
}
76+
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
<netty.version>4.1.116.Final</netty.version>
8787
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
8888
<!-- to match what is shipped with the Kafka Connect version used in our integration test environment -->
89-
<protobuf.version>3.24.4</protobuf.version>
89+
<protobuf.version>3.25.5</protobuf.version>
9090
<reactor.version>2024.0.1</reactor.version>
9191
<slf4j.version>1.7.36</slf4j.version>
9292
<sortpom-maven-plugin.version>4.0.0</sortpom-maven-plugin.version>

sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCudIT.kt

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,23 @@ import com.fasterxml.jackson.databind.ObjectMapper
2020
import io.kotest.assertions.nondeterministic.eventually
2121
import io.kotest.matchers.should
2222
import io.kotest.matchers.shouldBe
23+
import java.math.BigDecimal
2324
import java.time.LocalDate
25+
import java.time.LocalDateTime
26+
import java.time.LocalTime
2427
import kotlin.time.Duration.Companion.seconds
28+
import org.apache.kafka.connect.data.Date
29+
import org.apache.kafka.connect.data.Decimal
2530
import org.apache.kafka.connect.data.Schema
2631
import org.apache.kafka.connect.data.SchemaBuilder
2732
import org.apache.kafka.connect.data.Struct
33+
import org.apache.kafka.connect.data.Time
34+
import org.apache.kafka.connect.data.Timestamp
2835
import org.junit.jupiter.api.Test
2936
import org.neo4j.connectors.kafka.data.DynamicTypes
3037
import org.neo4j.connectors.kafka.data.PropertyType
3138
import org.neo4j.connectors.kafka.data.PropertyType.schema
39+
import org.neo4j.connectors.kafka.testing.DateSupport
3240
import org.neo4j.connectors.kafka.testing.TestSupport.runTest
3341
import org.neo4j.connectors.kafka.testing.format.KafkaConverter
3442
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
@@ -630,6 +638,62 @@ abstract class Neo4jCudIT {
630638
}
631639
}
632640

641+
@Neo4jSink(cud = [CudStrategy(TOPIC)])
642+
@Test
643+
fun `should create node from struct with connect types`(
644+
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
645+
session: Session
646+
) = runTest {
647+
session.run("CREATE CONSTRAINT FOR (n:Foo) REQUIRE n.id IS KEY").consume()
648+
session.run("CREATE CONSTRAINT FOR (n:Bar) REQUIRE n.id IS KEY").consume()
649+
650+
val propertiesSchema =
651+
SchemaBuilder.struct()
652+
.field("id", Schema.INT64_SCHEMA)
653+
.field("height", Decimal.schema(2))
654+
.field("dob", Date.SCHEMA)
655+
.field("tob", Time.SCHEMA)
656+
.field("tsob", Timestamp.SCHEMA)
657+
.build()
658+
val createNodeSchema =
659+
SchemaBuilder.struct()
660+
.field("type", Schema.STRING_SCHEMA)
661+
.field("op", Schema.STRING_SCHEMA)
662+
.field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA))
663+
.field("properties", propertiesSchema)
664+
.build()
665+
666+
producer.publish(
667+
valueSchema = createNodeSchema,
668+
value =
669+
Struct(createNodeSchema)
670+
.put("type", "node")
671+
.put("op", "create")
672+
.put("labels", listOf("Foo", "Bar"))
673+
.put(
674+
"properties",
675+
Struct(propertiesSchema)
676+
.put("id", 1L)
677+
.put("height", BigDecimal.valueOf(185, 2))
678+
.put("dob", DateSupport.date(1978, 1, 15))
679+
.put("tob", DateSupport.time(7, 45, 12, 999))
680+
.put("tsob", DateSupport.timestamp(1978, 1, 15, 7, 45, 12, 999))))
681+
682+
eventually(30.seconds) { session.run("MATCH (n) RETURN n", emptyMap()).single() }
683+
.get("n")
684+
.asNode() should
685+
{
686+
it.labels() shouldBe listOf("Foo", "Bar")
687+
it.asMap() shouldBe
688+
mapOf(
689+
"id" to 1L,
690+
"height" to "1.85",
691+
"dob" to LocalDate.of(1978, 1, 15),
692+
"tob" to LocalTime.of(7, 45, 12, 999000000),
693+
"tsob" to LocalDateTime.of(1978, 1, 15, 7, 45, 12, 999000000))
694+
}
695+
}
696+
633697
@Neo4jSink(cud = [CudStrategy(TOPIC)])
634698
@Test
635699
fun `should create relationship`(

sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCypherIT.kt

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import io.kotest.assertions.nondeterministic.eventually
2222
import io.kotest.matchers.collections.shouldContainExactly
2323
import io.kotest.matchers.should
2424
import io.kotest.matchers.shouldBe
25+
import java.math.BigDecimal
26+
import java.math.BigInteger
2527
import java.time.Instant
2628
import java.time.LocalDate
2729
import java.time.LocalDateTime
@@ -33,9 +35,13 @@ import java.time.ZonedDateTime
3335
import java.time.temporal.ChronoUnit
3436
import java.util.stream.Stream
3537
import kotlin.time.Duration.Companion.seconds
38+
import org.apache.kafka.connect.data.Date
39+
import org.apache.kafka.connect.data.Decimal
3640
import org.apache.kafka.connect.data.Schema
3741
import org.apache.kafka.connect.data.SchemaBuilder
3842
import org.apache.kafka.connect.data.Struct
43+
import org.apache.kafka.connect.data.Time
44+
import org.apache.kafka.connect.data.Timestamp
3945
import org.junit.jupiter.api.Test
4046
import org.junit.jupiter.api.extension.ExtensionContext
4147
import org.junit.jupiter.params.ParameterizedTest
@@ -45,6 +51,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource
4551
import org.neo4j.connectors.kafka.configuration.PayloadMode
4652
import org.neo4j.connectors.kafka.data.DynamicTypes
4753
import org.neo4j.connectors.kafka.data.PropertyType
54+
import org.neo4j.connectors.kafka.testing.DateSupport
4855
import org.neo4j.connectors.kafka.testing.TestSupport.runTest
4956
import org.neo4j.connectors.kafka.testing.format.KafkaConverter
5057
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
@@ -217,7 +224,7 @@ abstract class Neo4jCypherIT {
217224
cypher = [CypherStrategy(TOPIC, "CREATE (p:Data) SET p.value = event")],
218225
schemaControlValueCompatibility = SchemaCompatibilityMode.NONE)
219226
@ParameterizedTest
220-
@ArgumentsSource(AvroSimpleTypes::class)
227+
@ArgumentsSource(SimpleTypes::class)
221228
fun `should support connect simple types`(
222229
schema: Schema,
223230
value: Any?,
@@ -236,7 +243,7 @@ abstract class Neo4jCypherIT {
236243
}
237244
}
238245

239-
object AvroSimpleTypes : ArgumentsProvider {
246+
object SimpleTypes : ArgumentsProvider {
240247
override fun provideArguments(context: ExtensionContext?): Stream<out Arguments> {
241248
return Stream.of(
242249
Arguments.of(Schema.INT8_SCHEMA, Byte.MAX_VALUE, Byte.MAX_VALUE),
@@ -260,7 +267,47 @@ abstract class Neo4jCypherIT {
260267
Arguments.of(
261268
Schema.OPTIONAL_BYTES_SCHEMA,
262269
"a string".encodeToByteArray(),
263-
"a string".encodeToByteArray()))
270+
"a string".encodeToByteArray()),
271+
)
272+
}
273+
}
274+
275+
@Neo4jSink(
276+
cypher = [CypherStrategy(TOPIC, "CREATE (p:Data) SET p.value = event")],
277+
schemaControlValueCompatibility = SchemaCompatibilityMode.NONE)
278+
@ParameterizedTest
279+
@ArgumentsSource(ConnectTypes::class)
280+
fun `should support connect types`(
281+
schema: Schema,
282+
value: Any?,
283+
expected: Any?,
284+
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
285+
session: Session
286+
) = runTest {
287+
producer.publish(valueSchema = schema, value = value)
288+
289+
eventually(30.seconds) { session.run("MATCH (n:Data) RETURN n", emptyMap()).single() }
290+
.get("n")
291+
.asNode() should
292+
{
293+
it.labels() shouldBe listOf("Data")
294+
it.asMap() shouldBe mapOf("value" to expected)
295+
}
296+
}
297+
298+
object ConnectTypes : ArgumentsProvider {
299+
override fun provideArguments(context: ExtensionContext?): Stream<out Arguments> {
300+
return Stream.of(
301+
Arguments.of(Date.SCHEMA, DateSupport.date(2000, 1, 1), LocalDate.of(2000, 1, 1)),
302+
Arguments.of(
303+
Time.SCHEMA, DateSupport.time(23, 59, 59, 999), LocalTime.of(23, 59, 59, 999000000)),
304+
Arguments.of(
305+
Timestamp.SCHEMA,
306+
DateSupport.timestamp(2000, 1, 1, 23, 59, 59, 999),
307+
LocalDateTime.of(2000, 1, 1, 23, 59, 59, 999000000)),
308+
Arguments.of(Decimal.schema(4), BigDecimal(BigInteger("1234567890"), 4), "123456.7890"),
309+
Arguments.of(
310+
Decimal.schema(6), BigDecimal(BigInteger("1234567890000"), 6), "1234567.890000"))
264311
}
265312
}
266313

sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jNodePatternIT.kt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,26 @@ import com.fasterxml.jackson.databind.ObjectMapper
2020
import io.kotest.assertions.nondeterministic.eventually
2121
import io.kotest.matchers.should
2222
import io.kotest.matchers.shouldBe
23+
import java.math.BigDecimal
2324
import java.time.Instant
2425
import java.time.LocalDate
26+
import java.time.LocalDateTime
27+
import java.time.LocalTime
2528
import java.time.ZoneOffset
2629
import java.time.temporal.ChronoUnit
2730
import kotlin.time.Duration.Companion.seconds
31+
import org.apache.kafka.connect.data.Date
32+
import org.apache.kafka.connect.data.Decimal
2833
import org.apache.kafka.connect.data.Schema
2934
import org.apache.kafka.connect.data.SchemaBuilder
3035
import org.apache.kafka.connect.data.Struct
36+
import org.apache.kafka.connect.data.Time
37+
import org.apache.kafka.connect.data.Timestamp
3138
import org.junit.jupiter.api.Test
3239
import org.neo4j.connectors.kafka.data.DynamicTypes
3340
import org.neo4j.connectors.kafka.data.PropertyType
3441
import org.neo4j.connectors.kafka.data.PropertyType.schema
42+
import org.neo4j.connectors.kafka.testing.DateSupport
3543
import org.neo4j.connectors.kafka.testing.TestSupport.runTest
3644
import org.neo4j.connectors.kafka.testing.format.KafkaConverter
3745
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
@@ -125,6 +133,52 @@ abstract class Neo4jNodePatternIT {
125133
}
126134
}
127135

136+
@Neo4jSink(
137+
nodePattern =
138+
[
139+
NodePatternStrategy(
140+
TOPIC, "(:User{!id,height,dob,tob,tsob})", mergeNodeProperties = false)])
141+
@Test
142+
fun `should create node from struct containing connect types`(
143+
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
144+
session: Session
145+
) = runTest {
146+
session.run("CREATE CONSTRAINT FOR (n:User) REQUIRE n.id IS KEY").consume()
147+
148+
SchemaBuilder.struct()
149+
.field("id", Schema.INT64_SCHEMA)
150+
.field("height", Decimal.schema(2))
151+
.field("dob", Date.SCHEMA)
152+
.field("tob", Time.SCHEMA)
153+
.field("tsob", Timestamp.SCHEMA)
154+
.build()
155+
.let { schema ->
156+
producer.publish(
157+
valueSchema = schema,
158+
value =
159+
Struct(schema)
160+
.put("id", 1L)
161+
.put("height", BigDecimal.valueOf(185, 2))
162+
.put("dob", DateSupport.date(1978, 1, 15))
163+
.put("tob", DateSupport.time(7, 45, 12, 999))
164+
.put("tsob", DateSupport.timestamp(1978, 1, 15, 7, 45, 12, 999)))
165+
}
166+
167+
eventually(30.seconds) { session.run("MATCH (n:User) RETURN n", emptyMap()).single() }
168+
.get("n")
169+
.asNode() should
170+
{
171+
it.labels() shouldBe listOf("User")
172+
it.asMap() shouldBe
173+
mapOf(
174+
"id" to 1L,
175+
"height" to "1.85",
176+
"dob" to LocalDate.of(1978, 1, 15),
177+
"tob" to LocalTime.of(7, 45, 12, 999000000),
178+
"tsob" to LocalDateTime.of(1978, 1, 15, 7, 45, 12, 999000000))
179+
}
180+
}
181+
128182
@Neo4jSink(
129183
nodePattern =
130184
[NodePatternStrategy(TOPIC, "(:User{!id,name,surname})", mergeNodeProperties = false)])

0 commit comments

Comments
 (0)