Skip to content

Commit 8bb7123

Browse files
authored
feat: implement configurable cdc source value serialization strategy (#169)
1 parent f72340e commit 8bb7123

File tree

11 files changed

+297
-113
lines changed

11 files changed

+297
-113
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,26 @@ class ChangeEventConverter() {
315315
}
316316
}
317317

318+
fun SchemaAndValue.extractEventSchema(): Schema {
319+
return this.schema().field("event").schema()
320+
}
321+
322+
fun SchemaAndValue.extractEventValue(): Struct {
323+
val value = this.value()
324+
if (value !is Struct) {
325+
throw IllegalArgumentException(
326+
"expected value to be a struct, but got: ${value?.javaClass}",
327+
)
328+
}
329+
val eventData = value.get("event")
330+
if (eventData !is Struct) {
331+
throw IllegalArgumentException(
332+
"expected event attribute to be a struct, but got: ${value.javaClass}",
333+
)
334+
}
335+
return eventData
336+
}
337+
318338
fun Struct.toChangeEvent(): ChangeEvent =
319339
ChangeEvent(
320340
ChangeIdentifier(getString("id")),

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceKeyStrategyIT.kt

Lines changed: 17 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import io.kotest.matchers.nulls.shouldNotBeNull
2121
import java.time.Duration
2222
import org.assertj.core.api.Assertions.assertThat
2323
import org.junit.jupiter.api.Test
24-
import org.junit.jupiter.api.TestInfo
2524
import org.neo4j.cdc.client.model.ChangeEvent
2625
import org.neo4j.cdc.client.model.EntityOperation
2726
import org.neo4j.cdc.client.model.EventType
@@ -53,24 +52,17 @@ abstract class Neo4jCdcSourceKeyStrategyIT {
5352
CdcSourceTopic(
5453
topic = "neo4j-cdc-topic-key-serialization-none",
5554
patterns = arrayOf(CdcSourceParam("(:TestSource{name,+execId})")),
56-
keySerialization = "SKIP"),
55+
keySerializationStrategy = "SKIP"),
5756
),
5857
),
5958
)
6059
@Test
6160
fun `supports skipping serialization of keys`(
62-
testInfo: TestInfo,
6361
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-none", offset = "earliest")
6462
consumer: ConvertingKafkaConsumer,
6563
session: Session
6664
) {
67-
val executionId = testInfo.displayName + System.currentTimeMillis()
68-
session
69-
.run(
70-
"CREATE (:TestSource {name: 'Jane', execId: \$execId})",
71-
mapOf("execId" to executionId),
72-
)
73-
.consume()
65+
session.run("CREATE (:TestSource {name: 'Jane'})").consume()
7466

7567
TopicVerifier.createForMap(consumer)
7668
.assertMessage { it.raw.key().shouldBeNull() }
@@ -87,24 +79,17 @@ abstract class Neo4jCdcSourceKeyStrategyIT {
8779
CdcSourceTopic(
8880
topic = "neo4j-cdc-topic-key-serialization-whole",
8981
patterns = arrayOf(CdcSourceParam("(:TestSource{name,+execId})")),
90-
keySerialization = "WHOLE_VALUE"),
82+
keySerializationStrategy = "WHOLE_VALUE"),
9183
),
9284
),
9385
)
9486
@Test
9587
fun `supports serialization of keys as whole values`(
96-
testInfo: TestInfo,
9788
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-whole", offset = "earliest")
9889
consumer: ConvertingKafkaConsumer,
9990
session: Session
10091
) {
101-
val executionId = testInfo.displayName + System.currentTimeMillis()
102-
session
103-
.run(
104-
"CREATE (:TestSource {name: 'Jane', execId: \$execId})",
105-
mapOf("execId" to executionId),
106-
)
107-
.consume()
92+
session.run("CREATE (:TestSource {name: 'Jane'})").consume()
10893

10994
TopicVerifier.create<ChangeEvent, ChangeEvent>(consumer)
11095
.assertMessageKey { key ->
@@ -114,7 +99,7 @@ abstract class Neo4jCdcSourceKeyStrategyIT {
11499
.hasOperation(EntityOperation.CREATE)
115100
.labelledAs("TestSource")
116101
.hasNoBeforeState()
117-
.hasAfterStateProperties(mapOf("name" to "Jane", "execId" to executionId))
102+
.hasAfterStateProperties(mapOf("name" to "Jane"))
118103
}
119104
.verifyWithin(Duration.ofSeconds(30))
120105
}
@@ -129,24 +114,17 @@ abstract class Neo4jCdcSourceKeyStrategyIT {
129114
CdcSourceTopic(
130115
topic = "neo4j-cdc-topic-key-serialization-element-ids",
131116
patterns = arrayOf(CdcSourceParam("(:TestSource{name,+execId})")),
132-
keySerialization = "ELEMENT_ID"),
117+
keySerializationStrategy = "ELEMENT_ID"),
133118
),
134119
),
135120
)
136121
@Test
137122
fun `supports serialization of keys as element IDs`(
138-
testInfo: TestInfo,
139123
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-element-ids", offset = "earliest")
140124
consumer: ConvertingKafkaConsumer,
141125
session: Session
142126
) {
143-
val executionId = testInfo.displayName + System.currentTimeMillis()
144-
session
145-
.run(
146-
"CREATE (:TestSource {name: 'Jane', execId: \$execId})",
147-
mapOf("execId" to executionId),
148-
)
149-
.consume()
127+
session.run("CREATE (:TestSource {name: 'Jane'})").consume()
150128

151129
TopicVerifier.create<String, Map<String, Any>>(consumer)
152130
.assertMessageKey { it.shouldNotBeNull() }
@@ -163,25 +141,18 @@ abstract class Neo4jCdcSourceKeyStrategyIT {
163141
CdcSourceTopic(
164142
topic = "neo4j-cdc-topic-key-serialization-missing-node-keys",
165143
patterns = arrayOf(CdcSourceParam("(:TestSource{name,+execId})")),
166-
keySerialization = "ENTITY_KEYS"),
144+
keySerializationStrategy = "ENTITY_KEYS"),
167145
),
168146
),
169147
)
170148
@Test
171149
fun `supports serialization of keys as (missing) node keys`(
172-
testInfo: TestInfo,
173150
@TopicConsumer(
174151
topic = "neo4j-cdc-topic-key-serialization-missing-node-keys", offset = "earliest")
175152
consumer: ConvertingKafkaConsumer,
176153
session: Session
177154
) {
178-
val executionId = testInfo.displayName + System.currentTimeMillis()
179-
session
180-
.run(
181-
"CREATE (:TestSource {name: 'Jane', execId: \$execId})",
182-
mapOf("execId" to executionId),
183-
)
184-
.consume()
155+
session.run("CREATE (:TestSource {name: 'Jane'})").consume()
185156

186157
TopicVerifier.create<Map<String, Any>, Map<String, Any>>(consumer)
187158
.assertMessageKey { it.shouldBeNull() }
@@ -198,30 +169,18 @@ abstract class Neo4jCdcSourceKeyStrategyIT {
198169
CdcSourceTopic(
199170
topic = "neo4j-cdc-topic-key-serialization-node-keys",
200171
patterns = arrayOf(CdcSourceParam("(:TestSource{name,+execId})")),
201-
keySerialization = "ENTITY_KEYS"),
172+
keySerializationStrategy = "ENTITY_KEYS"),
202173
),
203174
),
204175
)
205176
@Test
206177
fun `supports serialization of keys as node keys`(
207-
testInfo: TestInfo,
208178
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-node-keys", offset = "earliest")
209179
consumer: ConvertingKafkaConsumer,
210180
session: Session
211181
) {
212-
val executionId = testInfo.displayName + System.currentTimeMillis()
213-
session
214-
.run(
215-
"CREATE CONSTRAINT FOR (ts:TestSource) REQUIRE ts.name IS NODE KEY",
216-
mapOf("execId" to executionId),
217-
)
218-
.consume()
219-
session
220-
.run(
221-
"CREATE (:TestSource {name: 'Jane', execId: \$execId})",
222-
mapOf("execId" to executionId),
223-
)
224-
.consume()
182+
session.run("CREATE CONSTRAINT FOR (ts:TestSource) REQUIRE ts.name IS NODE KEY").consume()
183+
session.run("CREATE (:TestSource {name: 'Jane'})").consume()
225184

226185
TopicVerifier.createForMap(consumer)
227186
.assertMessageKey {
@@ -242,25 +201,18 @@ abstract class Neo4jCdcSourceKeyStrategyIT {
242201
CdcSourceTopic(
243202
topic = "neo4j-cdc-topic-key-serialization-missing-rel-keys",
244203
patterns = arrayOf(CdcSourceParam("()-[:TO {name,+execId}]-()")),
245-
keySerialization = "ENTITY_KEYS"),
204+
keySerializationStrategy = "ENTITY_KEYS"),
246205
),
247206
),
248207
)
249208
@Test
250209
fun `supports serialization of keys as (missing) rel keys`(
251-
testInfo: TestInfo,
252210
@TopicConsumer(
253211
topic = "neo4j-cdc-topic-key-serialization-missing-rel-keys", offset = "earliest")
254212
consumer: ConvertingKafkaConsumer,
255213
session: Session
256214
) {
257-
val executionId = testInfo.displayName + System.currentTimeMillis()
258-
session
259-
.run(
260-
"CREATE (:Source)-[:TO {name: 'somewhere', execId: \$execId}]->(:Destination)",
261-
mapOf("execId" to executionId),
262-
)
263-
.consume()
215+
session.run("CREATE (:Source)-[:TO {name: 'somewhere'}]->(:Destination)").consume()
264216

265217
TopicVerifier.create<String, Map<String, Any>>(consumer)
266218
.assertMessageKey { it.shouldBeNull() }
@@ -277,30 +229,18 @@ abstract class Neo4jCdcSourceKeyStrategyIT {
277229
CdcSourceTopic(
278230
topic = "neo4j-cdc-topic-key-serialization-rel-keys",
279231
patterns = arrayOf(CdcSourceParam("()-[:TO {name,+execId}]-()")),
280-
keySerialization = "ENTITY_KEYS"),
232+
keySerializationStrategy = "ENTITY_KEYS"),
281233
),
282234
),
283235
)
284236
@Test
285237
fun `supports serialization of keys as rel keys`(
286-
testInfo: TestInfo,
287238
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-rel-keys", offset = "earliest")
288239
consumer: ConvertingKafkaConsumer,
289240
session: Session
290241
) {
291-
val executionId = testInfo.displayName + System.currentTimeMillis()
292-
session
293-
.run(
294-
"CREATE CONSTRAINT FOR ()-[to:TO]-() REQUIRE to.name IS RELATIONSHIP KEY",
295-
mapOf("execId" to executionId),
296-
)
297-
.consume()
298-
session
299-
.run(
300-
"CREATE (:Source)-[:TO {name: 'somewhere', execId: \$execId}]->(:Destination)",
301-
mapOf("execId" to executionId),
302-
)
303-
.consume()
242+
session.run("CREATE CONSTRAINT FOR ()-[to:TO]-() REQUIRE to.name IS RELATIONSHIP KEY").consume()
243+
session.run("CREATE (:Source)-[:TO {name: 'somewhere'}]->(:Destination)").consume()
304244

305245
TopicVerifier.createForMap(consumer)
306246
.assertMessageKey { key ->
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.source
18+
19+
import io.kotest.matchers.shouldBe
20+
import io.kotest.matchers.shouldNotBe
21+
import java.time.Duration
22+
import org.junit.jupiter.api.Test
23+
import org.neo4j.cdc.client.model.ChangeEvent
24+
import org.neo4j.cdc.client.model.EntityOperation
25+
import org.neo4j.cdc.client.model.EventType
26+
import org.neo4j.connectors.kafka.testing.MapSupport.excludingKeys
27+
import org.neo4j.connectors.kafka.testing.assertions.ChangeEventAssert
28+
import org.neo4j.connectors.kafka.testing.assertions.TopicVerifier
29+
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.AVRO
30+
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_EMBEDDED
31+
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_SCHEMA
32+
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.PROTOBUF
33+
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
34+
import org.neo4j.connectors.kafka.testing.kafka.ConvertingKafkaConsumer
35+
import org.neo4j.connectors.kafka.testing.source.CdcSource
36+
import org.neo4j.connectors.kafka.testing.source.CdcSourceParam
37+
import org.neo4j.connectors.kafka.testing.source.CdcSourceTopic
38+
import org.neo4j.connectors.kafka.testing.source.Neo4jSource
39+
import org.neo4j.connectors.kafka.testing.source.SourceStrategy
40+
import org.neo4j.connectors.kafka.testing.source.TopicConsumer
41+
import org.neo4j.driver.Session
42+
43+
abstract class Neo4jCdcSourceValueStrategyIT {
44+
@Neo4jSource(
45+
startFrom = "EARLIEST",
46+
strategy = SourceStrategy.CDC,
47+
cdc =
48+
CdcSource(
49+
topics =
50+
arrayOf(
51+
CdcSourceTopic(
52+
topic = "neo4j-cdc-topic-value-serialization-change-event",
53+
patterns = arrayOf(CdcSourceParam("(:TestSource{name,+execId})")),
54+
valueSerializationStrategy = "CHANGE_EVENT"),
55+
),
56+
),
57+
)
58+
@Test
59+
fun `supports serialization of values as whole change event`(
60+
@TopicConsumer(
61+
topic = "neo4j-cdc-topic-value-serialization-change-event", offset = "earliest")
62+
consumer: ConvertingKafkaConsumer,
63+
session: Session
64+
) {
65+
66+
session.run("CREATE (:TestSource {name: 'Jane'})").consume()
67+
68+
TopicVerifier.create<ChangeEvent, ChangeEvent>(consumer)
69+
.assertMessageValue {
70+
ChangeEventAssert.assertThat(it)
71+
.isNotNull()
72+
.hasOperation(EntityOperation.CREATE)
73+
.labelledAs("TestSource")
74+
.hasNoBeforeState()
75+
.hasAfterStateProperties(mapOf("name" to "Jane"))
76+
}
77+
.verifyWithin(Duration.ofSeconds(30))
78+
}
79+
80+
@Neo4jSource(
81+
startFrom = "EARLIEST",
82+
strategy = SourceStrategy.CDC,
83+
cdc =
84+
CdcSource(
85+
topics =
86+
arrayOf(
87+
CdcSourceTopic(
88+
topic = "neo4j-cdc-topic-value-serialization-entity-event",
89+
patterns = arrayOf(CdcSourceParam("(:TestSource{name,+execId})")),
90+
valueSerializationStrategy = "ENTITY_EVENT"),
91+
),
92+
),
93+
)
94+
@Test
95+
fun `supports serialization of values as only event entity`(
96+
@TopicConsumer(
97+
topic = "neo4j-cdc-topic-value-serialization-entity-event", offset = "earliest")
98+
consumer: ConvertingKafkaConsumer,
99+
session: Session
100+
) {
101+
session.run("CREATE (:TestSource {name: 'Jane'})").consume()
102+
103+
TopicVerifier.createForMap(consumer)
104+
.assertMessageValue {
105+
it["elementId"] shouldNotBe null
106+
it.excludingKeys("elementId") shouldBe
107+
mapOf(
108+
"eventType" to EventType.NODE.name,
109+
"operation" to EntityOperation.CREATE.name,
110+
"labels" to listOf("TestSource"),
111+
"keys" to emptyMap<Any, Any>(),
112+
"state" to
113+
mapOf(
114+
"after" to
115+
mapOf(
116+
"labels" to listOf("TestSource"),
117+
"properties" to mapOf("name" to "Jane"))))
118+
}
119+
.verifyWithin(Duration.ofSeconds(30))
120+
}
121+
}
122+
123+
@KeyValueConverter(key = AVRO, value = AVRO)
124+
class Neo4jCdcSourceValueStrategyAvroIT : Neo4jCdcSourceValueStrategyIT()
125+
126+
@KeyValueConverter(key = JSON_SCHEMA, value = JSON_SCHEMA)
127+
class Neo4jCdcSourceValueStrategyJsonSchemaIT : Neo4jCdcSourceValueStrategyIT()
128+
129+
@KeyValueConverter(key = JSON_EMBEDDED, value = JSON_EMBEDDED)
130+
class Neo4jCdcSourceValueStrategyJsonEmbeddedIT : Neo4jCdcSourceValueStrategyIT()
131+
132+
@KeyValueConverter(key = PROTOBUF, value = PROTOBUF)
133+
class Neo4jCdcSourceValueStrategyProtobufIT : Neo4jCdcSourceValueStrategyIT()

0 commit comments

Comments
 (0)