Skip to content

Commit 04f96ef

Browse files
authored
refactor: re-implement cypher sink strategy (#66)
1 parent 2d5343a commit 04f96ef

File tree

28 files changed

+1536
-122
lines changed

28 files changed

+1536
-122
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/DeprecatedNeo4jConfiguration.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ object ConfigGroup {
3535
const val ENCRYPTION = "Encryption"
3636
const val CONNECTION = "Connection"
3737
const val AUTHENTICATION = "Authentication"
38-
const val TOPIC_CYPHER_MAPPING = "Topic Cypher Mapping"
38+
const val STRATEGIES = "Mapping Strategies"
3939
const val BATCH = "Batch Management"
4040
const val RETRY = "Retry Strategy"
4141
}

common/src/main/kotlin/org/neo4j/connectors/kafka/data/Types.kt

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,32 +46,34 @@ enum class SimpleTypes(private val schema: Schema, private val optionalSchema: S
4646
STRING(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA),
4747
BYTES(Schema.BYTES_SCHEMA, Schema.OPTIONAL_BYTES_SCHEMA),
4848
LOCALDATE(
49-
SchemaBuilder(Schema.Type.STRING).namespaced("LocalDate").build(),
50-
SchemaBuilder(Schema.Type.STRING).namespaced("OptionalLocalDate").optional().build()),
49+
SchemaBuilder(Schema.Type.STRING).namespaced("LocalDate").version(1).build(),
50+
SchemaBuilder(Schema.Type.STRING).namespaced("LocalDate").version(1).optional().build()),
5151
LOCALDATETIME(
52-
SchemaBuilder(Schema.Type.STRING).namespaced("LocalDateTime").build(),
53-
SchemaBuilder(Schema.Type.STRING).namespaced("OptionalLocalDateTime").optional().build()),
52+
SchemaBuilder(Schema.Type.STRING).namespaced("LocalDateTime").version(1).build(),
53+
SchemaBuilder(Schema.Type.STRING).namespaced("LocalDateTime").version(1).optional().build()),
5454
LOCALTIME(
55-
SchemaBuilder(Schema.Type.STRING).namespaced("LocalTime").build(),
56-
SchemaBuilder(Schema.Type.STRING).namespaced("OptionalLocalTime").optional().build()),
55+
SchemaBuilder(Schema.Type.STRING).namespaced("LocalTime").version(1).build(),
56+
SchemaBuilder(Schema.Type.STRING).namespaced("LocalTime").version(1).optional().build()),
5757
@Suppress("SpellCheckingInspection")
5858
ZONEDDATETIME(
59-
SchemaBuilder(Schema.Type.STRING).namespaced("ZonedDateTime").build(),
60-
SchemaBuilder(Schema.Type.STRING).namespaced("OptionalZonedDateTime").optional().build()),
59+
SchemaBuilder(Schema.Type.STRING).namespaced("ZonedDateTime").version(1).build(),
60+
SchemaBuilder(Schema.Type.STRING).namespaced("ZonedDateTime").version(1).optional().build()),
6161
@Suppress("SpellCheckingInspection")
6262
OFFSETTIME(
63-
SchemaBuilder(Schema.Type.STRING).namespaced("OffsetTime").build(),
64-
SchemaBuilder(Schema.Type.STRING).namespaced("OptionalOffsetTime").optional().build()),
63+
SchemaBuilder(Schema.Type.STRING).namespaced("OffsetTime").version(1).build(),
64+
SchemaBuilder(Schema.Type.STRING).namespaced("OffsetTime").version(1).optional().build()),
6565
DURATION(
6666
SchemaBuilder(Schema.Type.STRUCT)
6767
.namespaced("Duration")
68+
.version(1)
6869
.field("months", Schema.INT64_SCHEMA)
6970
.field("days", Schema.INT64_SCHEMA)
7071
.field("seconds", Schema.INT64_SCHEMA)
7172
.field("nanoseconds", Schema.INT32_SCHEMA)
7273
.build(),
7374
SchemaBuilder(Schema.Type.STRUCT)
74-
.namespaced("OptionalDuration")
75+
.namespaced("Duration")
76+
.version(1)
7577
.field("months", Schema.INT64_SCHEMA)
7678
.field("days", Schema.INT64_SCHEMA)
7779
.field("seconds", Schema.INT64_SCHEMA)
@@ -81,13 +83,15 @@ enum class SimpleTypes(private val schema: Schema, private val optionalSchema: S
8183
POINT(
8284
SchemaBuilder(Schema.Type.STRUCT)
8385
.namespaced("Point")
86+
.version(1)
8487
.field("srid", Schema.INT32_SCHEMA)
8588
.field("x", Schema.FLOAT64_SCHEMA)
8689
.field("y", Schema.FLOAT64_SCHEMA)
8790
.field("z", Schema.FLOAT64_SCHEMA)
8891
.build(),
8992
SchemaBuilder(Schema.Type.STRUCT)
90-
.namespaced("OptionalPoint")
93+
.namespaced("Point")
94+
.version(1)
9195
.field("srid", Schema.INT32_SCHEMA)
9296
.field("x", Schema.FLOAT64_SCHEMA)
9397
.field("y", Schema.FLOAT64_SCHEMA)
@@ -217,15 +221,20 @@ object DynamicTypes {
217221

218222
return when (schema) {
219223
SimpleTypes.BOOLEAN.schema(false),
220-
SimpleTypes.BOOLEAN.schema(true) -> value as Boolean
224+
SimpleTypes.BOOLEAN.schema(true) -> value as Boolean?
221225
SimpleTypes.LONG.schema(false),
222226
SimpleTypes.LONG.schema(true) -> value as Long?
223227
SimpleTypes.FLOAT.schema(false),
224228
SimpleTypes.FLOAT.schema(true) -> value as Double?
225229
SimpleTypes.STRING.schema(false),
226230
SimpleTypes.STRING.schema(true) -> value as String?
227231
SimpleTypes.BYTES.schema(false),
228-
SimpleTypes.BYTES.schema(true) -> value as ByteArray?
232+
SimpleTypes.BYTES.schema(true) ->
233+
when (value) {
234+
is ByteArray -> value
235+
is ByteBuffer -> value.array()
236+
else -> throw IllegalArgumentException("unsupported bytes type ${value.javaClass.name}")
237+
}
229238
SimpleTypes.LOCALDATE.schema(false),
230239
SimpleTypes.LOCALDATE.schema(true) ->
231240
(value as String?)?.let {
@@ -350,6 +359,22 @@ object DynamicTypes {
350359
result
351360
}
352361
}
362+
Schema.Type.BYTES ->
363+
when (value) {
364+
is ByteArray -> value
365+
is ByteBuffer -> value.array()
366+
else ->
367+
throw IllegalArgumentException(
368+
"unsupported bytes type ${value.javaClass.name}")
369+
}
370+
Schema.Type.STRING -> value as String?
371+
Schema.Type.INT8 -> value as Byte?
372+
Schema.Type.INT16 -> value as Short?
373+
Schema.Type.INT32 -> value as Int?
374+
Schema.Type.INT64 -> value as Long?
375+
Schema.Type.FLOAT32 -> value as Float?
376+
Schema.Type.FLOAT64 -> value as Double?
377+
Schema.Type.BOOLEAN -> value as Boolean?
353378
else ->
354379
throw IllegalArgumentException(
355380
"unsupported schema ($schema) and value type (${value.javaClass.name})")
@@ -397,7 +422,6 @@ object DynamicTypes {
397422
is Point -> SimpleTypes.POINT.schema(optional)
398423
is Node ->
399424
SchemaBuilder.struct()
400-
.namespaced("Node")
401425
.apply {
402426
field("<id>", SimpleTypes.LONG.schema())
403427
field("<labels>", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
@@ -412,7 +436,6 @@ object DynamicTypes {
412436
.build()
413437
is Relationship ->
414438
SchemaBuilder.struct()
415-
.namespaced("Relationship")
416439
.apply {
417440
field("<id>", SimpleTypes.LONG.schema())
418441
field("<type>", SimpleTypes.STRING.schema())

common/src/main/resources/neo4j-sink-configuration.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,7 @@ neo4j.pattern.relationship.merge-properties=Type: Boolean;\nDescription: If enab
2727
using Sink `Relationship pattern` strategy (default false)
2828
neo4j.batch-size=Type: Integer;\nDescription: Max number of messages processed per batch.
2929
neo4j.batch-timeout=Type: Duration;\nDescription: Maximum amount of time a batch is allowed to be processed.
30+
neo4j.cypher.bind-header-as=Type: String;\nDescription: Under what name message header will be bound in user provided Cypher statements (default __header).
31+
neo4j.cypher.bind-key-as=Type: String;\nDescription: Under what name message key will be bound in user provided Cypher statements (default __key).
32+
neo4j.cypher.bind-value-as=Type: String;\nDescription: Under what name message value will be bound in user provided Cypher statements (default __value).
33+
neo4j.cypher.bind-value-as-event=Type: Boolean;\nDescription: Whether message value will be bound as 'event' in user provided Cypher statements for backward compatibility (default true).

common/src/test/kotlin/org/neo4j/connectors/kafka/data/DynamicTypesTest.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ class DynamicTypesTest {
196196
// Node
197197
DynamicTypes.toConnectSchema(TestNode(0, emptyList(), emptyMap()), false) shouldBe
198198
SchemaBuilder.struct()
199-
.name("org.neo4j.connectors.kafka.Node")
200199
.field("<id>", Schema.INT64_SCHEMA)
201200
.field("<labels>", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
202201
.build()
@@ -208,7 +207,6 @@ class DynamicTypesTest {
208207
mapOf("name" to Values.value("john"), "surname" to Values.value("doe"))),
209208
false) shouldBe
210209
SchemaBuilder.struct()
211-
.name("org.neo4j.connectors.kafka.Node")
212210
.field("<id>", Schema.INT64_SCHEMA)
213211
.field("<labels>", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
214212
.field("name", Schema.STRING_SCHEMA)
@@ -218,7 +216,6 @@ class DynamicTypesTest {
218216
// Relationship
219217
DynamicTypes.toConnectSchema(TestRelationship(0, 1, 2, "KNOWS", emptyMap()), false) shouldBe
220218
SchemaBuilder.struct()
221-
.name("org.neo4j.connectors.kafka.Relationship")
222219
.field("<id>", Schema.INT64_SCHEMA)
223220
.field("<type>", SchemaBuilder.STRING_SCHEMA)
224221
.field("<start.id>", Schema.INT64_SCHEMA)
@@ -233,7 +230,6 @@ class DynamicTypesTest {
233230
mapOf("name" to Values.value("john"), "surname" to Values.value("doe"))),
234231
false) shouldBe
235232
SchemaBuilder.struct()
236-
.name("org.neo4j.connectors.kafka.Relationship")
237233
.field("<id>", Schema.INT64_SCHEMA)
238234
.field("<type>", SchemaBuilder.STRING_SCHEMA)
239235
.field("<start.id>", Schema.INT64_SCHEMA)

common/src/test/kotlin/org/neo4j/connectors/kafka/data/TypesTest.kt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ class TypesTest {
126126
schemaAndValue(person).also { (schema, converted, reverted) ->
127127
schema shouldBe
128128
SchemaBuilder.struct()
129-
.name("org.neo4j.connectors.kafka.Node")
130129
.field("<id>", SimpleTypes.LONG.schema())
131130
.field("<labels>", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
132131
.field("name", SimpleTypes.STRING.schema())
@@ -155,7 +154,6 @@ class TypesTest {
155154
schemaAndValue(company).also { (schema, converted, reverted) ->
156155
schema shouldBe
157156
SchemaBuilder.struct()
158-
.name("org.neo4j.connectors.kafka.Node")
159157
.field("<id>", SimpleTypes.LONG.schema())
160158
.field("<labels>", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
161159
.field("name", SimpleTypes.STRING.schema())
@@ -181,7 +179,6 @@ class TypesTest {
181179
schemaAndValue(worksFor).also { (schema, converted, reverted) ->
182180
schema shouldBe
183181
SchemaBuilder.struct()
184-
.name("org.neo4j.connectors.kafka.Relationship")
185182
.field("<id>", SimpleTypes.LONG.schema())
186183
.field("<type>", SimpleTypes.STRING.schema())
187184
.field("<start.id>", SimpleTypes.LONG.schema())

docker/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ services:
101101
- schema-registry
102102
ports:
103103
- "8083:8083"
104+
- "5005:5005"
104105
volumes:
105106
- ./plugins:/tmp/connect-plugins
106107
environment:
@@ -123,6 +124,7 @@ services:
123124
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
124125
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
125126
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
127+
JAVA_TOOL_OPTIONS: "-agentlib:jdwp=transport=dt_socket,address=*:5005,server=y,suspend=n"
126128
command:
127129
- bash
128130
- -c

legacy-connectors/src/test/kotlin/streams/kafka/connect/sink/LegacyNeo4jSinkIT.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.neo4j.driver.Session
3535

3636
abstract class LegacyNeo4jSinkIT {
3737
companion object {
38-
const val TOPIC = "persons"
38+
private const val TOPIC = "persons"
3939
}
4040

4141
@LegacyNeo4jSink(

legacy-connectors/src/test/kotlin/streams/kafka/connect/source/LegacyNeo4jSourceIT.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.neo4j.driver.Session
3434
abstract class LegacyNeo4jSourceIT {
3535

3636
companion object {
37-
const val TOPIC = "neo4j-source-topic"
37+
private const val TOPIC = "neo4j-source-topic"
3838
}
3939

4040
@LegacyNeo4jSource(

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
<maven-surefire-plugin.version>3.1.2</maven-surefire-plugin.version>
7575
<mockito-kotlin.version>5.1.0</mockito-kotlin.version>
7676
<mockito.version>5.5.0</mockito.version>
77-
<neo4j-cypher-dsl.version>2022.9.1</neo4j-cypher-dsl.version>
77+
<neo4j-cypher-dsl.version>2022.9.2</neo4j-cypher-dsl.version>
7878
<neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
7979
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
8080
<protobuf.version>3.23.2</protobuf.version>

sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCdcSchemaFromStreamsMessageIT.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import org.neo4j.driver.Session
4848
class Neo4jCdcSchemaFromStreamsMessageIT {
4949

5050
companion object {
51-
const val TOPIC = "schema"
51+
private const val TOPIC = "schema"
5252
}
5353

5454
@Neo4jSink(cdcSchema = [CdcSchemaStrategy(TOPIC)])

0 commit comments

Comments
 (0)