Skip to content

Commit b33483a

Browse files
authored
feat: support new cdc messages for cdc strategies (#61)
1 parent 39431db commit b33483a

File tree

44 files changed

+2573
-154
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2573
-154
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ object ChangeEventExtensions {
5252

5353
private fun ChangeEvent.toConnectSchema(): Schema =
5454
SchemaBuilder.struct()
55-
.namespaced("cdc.ChangeEvent")
5655
.field("id", SimpleTypes.STRING.schema())
5756
.field("txId", SimpleTypes.LONG.schema())
5857
.field("seq", SimpleTypes.LONG.schema())
@@ -71,7 +70,6 @@ object ChangeEventExtensions {
7170

7271
internal fun Metadata.toConnectSchema(): Schema =
7372
SchemaBuilder.struct()
74-
.namespaced("cdc.Metadata")
7573
.field("authenticatedUser", SimpleTypes.STRING.schema())
7674
.field("executingUser", SimpleTypes.STRING.schema())
7775
.field("connectionType", SimpleTypes.STRING.schema(true))
@@ -148,7 +146,6 @@ object ChangeEventExtensions {
148146

149147
internal fun NodeEvent.toConnectSchema(): Schema =
150148
SchemaBuilder.struct()
151-
.namespaced("cdc.NodeEvent")
152149
.field("elementId", SimpleTypes.STRING.schema())
153150
.field("eventType", SimpleTypes.STRING.schema())
154151
.field("operation", SimpleTypes.STRING.schema())
@@ -185,7 +182,6 @@ object ChangeEventExtensions {
185182
private fun nodeStateSchema(before: NodeState?, after: NodeState?): Schema {
186183
val stateSchema =
187184
SchemaBuilder.struct()
188-
.namespaced("cdc.NodeState")
189185
.apply {
190186
this.field("labels", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
191187
this.field(
@@ -207,11 +203,7 @@ object ChangeEventExtensions {
207203
.optional()
208204
.build()
209205

210-
return SchemaBuilder.struct()
211-
.namespaced("cdc.NodeStates")
212-
.field("before", stateSchema)
213-
.field("after", stateSchema)
214-
.build()
206+
return SchemaBuilder.struct().field("before", stateSchema).field("after", stateSchema).build()
215207
}
216208

217209
private fun nodeStateValue(schema: Schema, before: NodeState?, after: NodeState?): Struct =
@@ -263,7 +255,6 @@ object ChangeEventExtensions {
263255

264256
internal fun RelationshipEvent.toConnectSchema(): Schema =
265257
SchemaBuilder.struct()
266-
.namespaced("cdc.RelationshipEvent")
267258
.field("elementId", SimpleTypes.STRING.schema())
268259
.field("eventType", SimpleTypes.STRING.schema())
269260
.field("operation", SimpleTypes.STRING.schema())
@@ -307,7 +298,6 @@ object ChangeEventExtensions {
307298

308299
internal fun Node.toConnectSchema(): Schema {
309300
return SchemaBuilder.struct()
310-
.namespaced("cdc.Node")
311301
.field("elementId", SimpleTypes.STRING.schema())
312302
.field("labels", SchemaBuilder.array(SimpleTypes.STRING.schema()).build())
313303
.field("keys", schemaForKeysByLabel(this.keys))
@@ -336,7 +326,6 @@ object ChangeEventExtensions {
336326
): Schema {
337327
val stateSchema =
338328
SchemaBuilder.struct()
339-
.namespaced("cdc.RelationshipState")
340329
.apply {
341330
this.field(
342331
"properties",
@@ -357,11 +346,7 @@ object ChangeEventExtensions {
357346
.optional()
358347
.build()
359348

360-
return SchemaBuilder.struct()
361-
.namespaced("cdc.RelationshipStates")
362-
.field("before", stateSchema)
363-
.field("after", stateSchema)
364-
.build()
349+
return SchemaBuilder.struct().field("before", stateSchema).field("after", stateSchema).build()
365350
}
366351

367352
private fun relationshipStateValue(

common/src/main/kotlin/org/neo4j/connectors/kafka/data/Headers.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ fun SinkRecord.isCdcMessage(): Boolean =
3939
this.headers()?.any { header: Header? -> header?.key() == Headers.KEY_CDC_ID } ?: false
4040

4141
fun SinkRecord.cdcTxId(): Long? =
42-
this.headers()?.singleOrNull { it.key() == Headers.KEY_CDC_TX_ID }?.value() as Long?
42+
(this.headers()?.singleOrNull { it.key() == Headers.KEY_CDC_TX_ID }?.value() as Number?)
43+
?.toLong()
4344

4445
fun SinkRecord.cdcTxSeq(): Int? =
45-
this.headers()?.singleOrNull { it.key() == Headers.KEY_CDC_TX_SEQ }?.value() as Int?
46+
(this.headers()?.singleOrNull { it.key() == Headers.KEY_CDC_TX_SEQ }?.value() as Number?)
47+
?.toInt()
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.data
18+
19+
import java.time.Instant
20+
import java.time.ZoneOffset
21+
import java.time.ZonedDateTime
22+
import org.neo4j.cdc.client.model.CaptureMode
23+
import org.neo4j.cdc.client.model.ChangeEvent
24+
import org.neo4j.cdc.client.model.ChangeIdentifier
25+
import org.neo4j.cdc.client.model.EntityOperation
26+
import org.neo4j.cdc.client.model.Metadata
27+
import org.neo4j.cdc.client.model.Node
28+
import org.neo4j.cdc.client.model.NodeEvent
29+
import org.neo4j.cdc.client.model.NodeState
30+
import org.neo4j.cdc.client.model.RelationshipEvent
31+
import org.neo4j.cdc.client.model.RelationshipState
32+
import org.neo4j.cdc.client.model.State
33+
import org.neo4j.connectors.kafka.events.NodePayload
34+
import org.neo4j.connectors.kafka.events.OperationType
35+
import org.neo4j.connectors.kafka.events.RelationshipPayload
36+
import org.neo4j.connectors.kafka.events.StreamsConstraintType
37+
import org.neo4j.connectors.kafka.events.StreamsTransactionEvent
38+
39+
object StreamsTransactionEventExtensions {
40+
41+
fun StreamsTransactionEvent.toChangeEvent(): ChangeEvent {
42+
val cdcOperation =
43+
when (this.meta.operation) {
44+
OperationType.created -> EntityOperation.CREATE
45+
OperationType.updated -> EntityOperation.UPDATE
46+
OperationType.deleted -> EntityOperation.DELETE
47+
}
48+
val cdcMetadata =
49+
Metadata(
50+
this.meta.username,
51+
this.meta.username,
52+
"unknown",
53+
CaptureMode.OFF,
54+
"unknown",
55+
"unknown",
56+
"unknown",
57+
ZonedDateTime.ofInstant(Instant.ofEpochMilli(this.meta.timestamp), ZoneOffset.UTC),
58+
ZonedDateTime.ofInstant(Instant.ofEpochMilli(this.meta.timestamp), ZoneOffset.UTC),
59+
emptyMap(),
60+
emptyMap())
61+
val cdcEvent =
62+
when (val payload = this.payload) {
63+
is NodePayload -> {
64+
val before = payload.before?.let { NodeState(it.labels, it.properties) }
65+
val after = payload.after?.let { NodeState(it.labels, it.properties) }
66+
val referenceState = extractState(this, before, after)
67+
68+
val keys =
69+
this.schema.constraints
70+
.filter { c -> c.type == StreamsConstraintType.UNIQUE }
71+
.map { c ->
72+
c.label!! to c.properties.associateWith { referenceState.properties[it] }
73+
}
74+
.groupBy { it.first }
75+
.mapValues { it.value.map { p -> p.second } }
76+
77+
NodeEvent(payload.id, cdcOperation, referenceState.labels, keys, before, after)
78+
}
79+
is RelationshipPayload -> {
80+
val before = payload.before?.let { RelationshipState(it.properties) }
81+
val after = payload.after?.let { RelationshipState(it.properties) }
82+
83+
RelationshipEvent(
84+
payload.id,
85+
payload.label,
86+
Node(
87+
payload.start.id,
88+
payload.start.labels ?: emptyList(),
89+
buildMap {
90+
val label = payload.start.labels?.firstOrNull()
91+
if (label != null) {
92+
this[label] = listOf(payload.start.ids)
93+
}
94+
}),
95+
Node(
96+
payload.end.id,
97+
payload.end.labels ?: emptyList(),
98+
buildMap {
99+
val label = payload.end.labels?.firstOrNull()
100+
if (label != null) {
101+
this[label] = listOf(payload.end.ids)
102+
}
103+
}),
104+
emptyList(),
105+
cdcOperation,
106+
before,
107+
after)
108+
}
109+
else ->
110+
throw IllegalArgumentException("unexpected payload type ${payload.javaClass.name}")
111+
}
112+
113+
return ChangeEvent(
114+
ChangeIdentifier("${this.meta.txId}:${this.meta.txEventId}"),
115+
this.meta.txId,
116+
this.meta.txEventId,
117+
cdcMetadata,
118+
cdcEvent)
119+
}
120+
121+
private fun <T : State> extractState(event: StreamsTransactionEvent, before: T?, after: T?): T {
122+
val (name, referenceState) =
123+
when (event.meta.operation) {
124+
OperationType.created -> ("after" to after)
125+
OperationType.updated -> ("before" to before)
126+
OperationType.deleted -> ("before" to before)
127+
}
128+
require(referenceState != null) {
129+
"$name state should not be null for ${event.meta.operation} events"
130+
}
131+
return referenceState
132+
}
133+
}

common/src/main/kotlin/org/neo4j/connectors/kafka/utils/StreamsUtils.kt

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,6 @@ object StreamsUtils {
2222

2323
@JvmStatic val WITH_EVENT_FROM: String = "WITH event, from"
2424

25-
@JvmStatic val STREAMS_CONFIG_PREFIX = "streams."
26-
27-
@JvmStatic val STREAMS_SINK_TOPIC_PREFIX = "sink.topic.cypher."
28-
29-
@JvmStatic val LEADER = "LEADER"
30-
31-
@JvmStatic val SYSTEM_DATABASE_NAME = "system"
32-
3325
fun <T> ignoreExceptions(action: () -> T, vararg toIgnore: Class<out Throwable>): T? {
3426
return try {
3527
action()
@@ -44,11 +36,4 @@ object StreamsUtils {
4436
}
4537
}
4638
}
47-
48-
fun closeSafetely(closeable: AutoCloseable, onError: (Throwable) -> Unit = {}) =
49-
try {
50-
closeable.close()
51-
} catch (e: Throwable) {
52-
onError(e)
53-
}
5439
}

common/src/main/resources/neo4j-sink-configuration.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
## Connection Properties
1616
neo4j.cdc.sourceId.topics=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `SourceId` strategy
1717
neo4j.cdc.sourceId.labelName=Type: String;\nDescription: The label name attached to the events with the `SourceId` strategy (default SourceEvent)
18-
neo4j.cdc.sourceId.idName=Type: String;\nDescription: The id property name attached to the events with the `SourceId` strategy (default sourceId)
18+
neo4j.cdc.sourceId.propertyName=Type: String;\nDescription: The id property name attached to the events with the `SourceId` strategy (default sourceId)
1919
neo4j.cdc.schema.topics=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `Schema` strategy
2020
neo4j.batch-parallelize=Type: Boolean;\nDescription: If enabled messages are processed concurrently in the sink. \
2121
Non concurrent execution supports in-order processing, e.g. for CDC (default true)

0 commit comments

Comments
 (0)