Skip to content

Commit 967d5d7

Browse files
authored
feat: add headers for cdc generated messages (#59)
1 parent a561087 commit 967d5d7

File tree

9 files changed

+285
-22
lines changed

9 files changed

+285
-22
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.connect
18+
19+
import org.apache.kafka.connect.data.Schema
20+
import org.apache.kafka.connect.data.SchemaAndValue
21+
import org.apache.kafka.connect.header.Header
22+
23+
data class ConnectHeader(private val key: String, private val schemaAndValue: SchemaAndValue) :
24+
Header {
25+
26+
override fun key(): String = key
27+
28+
override fun schema(): Schema = schemaAndValue.schema()
29+
30+
override fun value(): Any = schemaAndValue.value()
31+
32+
override fun with(schema: Schema?, value: Any?): Header =
33+
ConnectHeader(key, SchemaAndValue(schema!!, value!!))
34+
35+
override fun rename(key: String?): Header = ConnectHeader(key!!, schemaAndValue)
36+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.data
18+
19+
import org.apache.kafka.connect.data.Schema
20+
import org.apache.kafka.connect.data.SchemaAndValue
21+
import org.apache.kafka.connect.header.Header
22+
import org.apache.kafka.connect.sink.SinkRecord
23+
import org.neo4j.cdc.client.model.ChangeEvent
24+
import org.neo4j.connectors.kafka.connect.ConnectHeader
25+
26+
object Headers {
27+
const val KEY_CDC_ID = "neo4j.source.cdc.id"
28+
const val KEY_CDC_TX_ID = "neo4j.source.cdc.tx-id"
29+
const val KEY_CDC_TX_SEQ = "neo4j.source.cdc.tx-seq"
30+
31+
fun from(event: ChangeEvent): List<Header> =
32+
listOf(
33+
ConnectHeader(KEY_CDC_ID, SchemaAndValue(Schema.STRING_SCHEMA, event.id.id)),
34+
ConnectHeader(KEY_CDC_TX_ID, SchemaAndValue(Schema.INT64_SCHEMA, event.txId)),
35+
ConnectHeader(KEY_CDC_TX_SEQ, SchemaAndValue(Schema.INT32_SCHEMA, event.seq)))
36+
}
37+
38+
fun SinkRecord.isCdcMessage(): Boolean =
39+
this.headers()?.any { header: Header? -> header?.key() == Headers.KEY_CDC_ID } ?: false
40+
41+
fun SinkRecord.cdcTxId(): Long? =
42+
this.headers()?.singleOrNull { it.key() == Headers.KEY_CDC_TX_ID }?.value() as Long?
43+
44+
fun SinkRecord.cdcTxSeq(): Int? =
45+
this.headers()?.singleOrNull { it.key() == Headers.KEY_CDC_TX_SEQ }?.value() as Int?

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
<!-- as well as versions in the docker-compose.yml file -->
5858
<kafka-schema-registry.version>6.1.13</kafka-schema-registry.version>
5959
<kafka.version>2.7.2</kafka.version>
60-
<kotest-assertions-core-jvm.version>5.6.2</kotest-assertions-core-jvm.version>
60+
<kotest-assertions-core-jvm.version>5.8.0</kotest-assertions-core-jvm.version>
6161
<kotlin.coroutines.version>1.7.3</kotlin.coroutines.version>
6262
<kotlin.version>1.9.0</kotlin.version>
6363
<license-maven-plugin.version>4.2</license-maven-plugin.version>
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.neo4j.connectors.kafka.source
19+
20+
import io.kotest.matchers.collections.shouldHaveSingleElement
21+
import io.kotest.matchers.collections.shouldHaveSize
22+
import java.time.Duration
23+
import org.apache.avro.generic.GenericRecord
24+
import org.apache.kafka.clients.consumer.KafkaConsumer
25+
import org.apache.kafka.connect.data.Schema
26+
import org.apache.kafka.connect.storage.SimpleHeaderConverter
27+
import org.junit.jupiter.api.Test
28+
import org.junit.jupiter.api.TestInfo
29+
import org.neo4j.connectors.kafka.connect.ConnectHeader
30+
import org.neo4j.connectors.kafka.data.Headers
31+
import org.neo4j.connectors.kafka.testing.assertions.TopicVerifier
32+
import org.neo4j.connectors.kafka.testing.source.CdcSource
33+
import org.neo4j.connectors.kafka.testing.source.CdcSourceParam
34+
import org.neo4j.connectors.kafka.testing.source.CdcSourceTopic
35+
import org.neo4j.connectors.kafka.testing.source.Neo4jSource
36+
import org.neo4j.connectors.kafka.testing.source.SourceStrategy.CDC
37+
import org.neo4j.connectors.kafka.testing.source.TopicConsumer
38+
import org.neo4j.driver.Session
39+
40+
class Neo4jCdcSourceIT {
41+
42+
@Neo4jSource(
43+
startFrom = "EARLIEST",
44+
strategy = CDC,
45+
cdc =
46+
CdcSource(
47+
topics =
48+
arrayOf(
49+
CdcSourceTopic(
50+
topic = "neo4j-cdc-nodes-topic",
51+
patterns = arrayOf(CdcSourceParam("(:Person)"))),
52+
CdcSourceTopic(
53+
topic = "neo4j-cdc-rels-topic",
54+
patterns = arrayOf(CdcSourceParam("()-[:KNOWS]-()"))))))
55+
@Test
56+
fun `should place cdc related information into headers`(
57+
testInfo: TestInfo,
58+
@TopicConsumer(topic = "neo4j-cdc-nodes-topic", offset = "earliest")
59+
nodesConsumer: KafkaConsumer<String, GenericRecord>,
60+
@TopicConsumer(topic = "neo4j-cdc-rels-topic", offset = "earliest")
61+
relsConsumer: KafkaConsumer<String, GenericRecord>,
62+
session: Session
63+
) {
64+
val executionId = testInfo.displayName + System.currentTimeMillis()
65+
session
66+
.run(
67+
"""
68+
CREATE (p1:Person) SET p1 = ${'$'}person1
69+
CREATE (p2:Person) SET p2 = ${'$'}person2
70+
CREATE (p1)-[:KNOWS]->(p2)
71+
"""
72+
.trimIndent(),
73+
mapOf(
74+
"person1" to
75+
mapOf(
76+
"id" to 1L, "name" to "Jane", "surname" to "Doe", "execId" to executionId),
77+
"person2" to
78+
mapOf(
79+
"id" to 2L, "name" to "John", "surname" to "Doe", "execId" to executionId)))
80+
.consume()
81+
82+
TopicVerifier.create(nodesConsumer)
83+
.assertMessage { msg ->
84+
msg.headers()
85+
.map {
86+
ConnectHeader(
87+
it.key(), SimpleHeaderConverter().toConnectHeader("", it.key(), it.value()))
88+
}
89+
.asIterable()
90+
.shouldHaveSize(3)
91+
.shouldHaveSingleElement {
92+
it.key() == Headers.KEY_CDC_ID && it.schema() == Schema.STRING_SCHEMA
93+
}
94+
.shouldHaveSingleElement {
95+
it.key() == Headers.KEY_CDC_TX_ID &&
96+
listOf(
97+
Schema.INT8_SCHEMA,
98+
Schema.INT16_SCHEMA,
99+
Schema.INT32_SCHEMA,
100+
Schema.INT64_SCHEMA)
101+
.contains(it.schema())
102+
}
103+
.shouldHaveSingleElement {
104+
it.key() == Headers.KEY_CDC_TX_SEQ &&
105+
listOf(
106+
Schema.INT8_SCHEMA,
107+
Schema.INT16_SCHEMA,
108+
Schema.INT32_SCHEMA,
109+
Schema.INT64_SCHEMA)
110+
.contains(it.schema())
111+
}
112+
}
113+
.assertMessage { msg ->
114+
msg.headers()
115+
.map {
116+
ConnectHeader(
117+
it.key(), SimpleHeaderConverter().toConnectHeader("", it.key(), it.value()))
118+
}
119+
.asIterable()
120+
.shouldHaveSize(3)
121+
.shouldHaveSingleElement {
122+
it.key() == Headers.KEY_CDC_ID && it.schema() == Schema.STRING_SCHEMA
123+
}
124+
.shouldHaveSingleElement {
125+
it.key() == Headers.KEY_CDC_TX_ID &&
126+
listOf(
127+
Schema.INT8_SCHEMA,
128+
Schema.INT16_SCHEMA,
129+
Schema.INT32_SCHEMA,
130+
Schema.INT64_SCHEMA)
131+
.contains(it.schema())
132+
}
133+
.shouldHaveSingleElement {
134+
it.key() == Headers.KEY_CDC_TX_SEQ &&
135+
listOf(
136+
Schema.INT8_SCHEMA,
137+
Schema.INT16_SCHEMA,
138+
Schema.INT32_SCHEMA,
139+
Schema.INT64_SCHEMA)
140+
.contains(it.schema())
141+
}
142+
}
143+
.verifyWithin(Duration.ofSeconds(30))
144+
145+
TopicVerifier.create(relsConsumer)
146+
.assertMessage { msg ->
147+
msg.headers()
148+
.map {
149+
ConnectHeader(
150+
it.key(), SimpleHeaderConverter().toConnectHeader("", it.key(), it.value()))
151+
}
152+
.asIterable()
153+
.shouldHaveSize(3)
154+
.shouldHaveSingleElement {
155+
it.key() == Headers.KEY_CDC_ID && it.schema() == Schema.STRING_SCHEMA
156+
}
157+
.shouldHaveSingleElement {
158+
it.key() == Headers.KEY_CDC_TX_ID &&
159+
listOf(
160+
Schema.INT8_SCHEMA,
161+
Schema.INT16_SCHEMA,
162+
Schema.INT32_SCHEMA,
163+
Schema.INT64_SCHEMA)
164+
.contains(it.schema())
165+
}
166+
.shouldHaveSingleElement {
167+
it.key() == Headers.KEY_CDC_TX_SEQ &&
168+
listOf(
169+
Schema.INT8_SCHEMA,
170+
Schema.INT16_SCHEMA,
171+
Schema.INT32_SCHEMA,
172+
Schema.INT64_SCHEMA)
173+
.contains(it.schema())
174+
}
175+
}
176+
.verifyWithin(Duration.ofSeconds(30))
177+
}
178+
}

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceNodesIT.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,8 +480,8 @@ class Neo4jCdcSourceNodesIT {
480480
.consume()
481481

482482
TopicVerifier.create(consumer)
483-
.assertMessageValue { value ->
484-
assertThat(value)
483+
.assertMessage { msg ->
484+
assertThat(msg.value())
485485
.hasEventType(EventType.NODE)
486486
.hasOperation(Operation.CREATE)
487487
.hasLabels(setOf("TestSource", "Employee"))

source/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcTask.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.neo4j.cdc.client.model.ChangeEvent
3535
import org.neo4j.cdc.client.model.ChangeIdentifier
3636
import org.neo4j.connectors.kafka.configuration.helpers.VersionUtil
3737
import org.neo4j.connectors.kafka.data.ChangeEventExtensions.toConnectValue
38+
import org.neo4j.connectors.kafka.data.Headers
3839
import org.neo4j.driver.SessionConfig
3940
import org.slf4j.Logger
4041
import org.slf4j.LoggerFactory
@@ -125,10 +126,13 @@ class Neo4jCdcTask : SourceTask() {
125126
config.partition,
126127
mapOf("value" to changeEvent.id.id),
127128
topic,
129+
null,
128130
keyStrategy.schema(transformedValue),
129131
keyStrategy.value(transformedValue),
130132
transformedValue.schema(),
131-
transformedValue.value())
133+
transformedValue.value(),
134+
changeEvent.metadata.txCommitTime.toInstant().toEpochMilli(),
135+
Headers.from(changeEvent))
132136
})
133137
}
134138
}

source/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jQueryTaskTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.neo4j.connectors.kafka.source
1818

19-
import io.kotest.assertions.until.until
19+
import io.kotest.assertions.nondeterministic.until
2020
import io.kotest.matchers.collections.shouldContainExactly
2121
import java.time.Clock
2222
import java.time.Duration
@@ -444,6 +444,7 @@ class Neo4jQueryTaskTest {
444444
longToInt: Boolean = false
445445
): List<Map<String, Any>> =
446446
session.beginTransaction().use { tx ->
447+
val ts = clock.millis()
447448
val elements =
448449
(1..totalRecords).map {
449450
val result =
@@ -469,7 +470,7 @@ class Neo4jQueryTaskTest {
469470
| n AS node
470471
"""
471472
.trimMargin(),
472-
mapOf("timestamp" to clock.millis()))
473+
mapOf("timestamp" to ts + it))
473474
val next = result.next()
474475
val map = next.asMap().toMutableMap()
475476
map["array"] =

testing/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
</exclusion>
3131
</exclusions>
3232
</dependency>
33+
<dependency>
34+
<groupId>io.kotest</groupId>
35+
<artifactId>kotest-assertions-core-jvm</artifactId>
36+
</dependency>
3337
<dependency>
3438
<groupId>org.apache.avro</groupId>
3539
<artifactId>avro</artifactId>

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/assertions/TopicVerifier.kt

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.neo4j.connectors.kafka.testing.assertions
1818

19+
import io.kotest.matchers.nulls.shouldBeNull
1920
import java.time.Duration
2021
import java.util.function.Predicate
2122
import kotlin.math.min
@@ -32,35 +33,29 @@ class TopicVerifier<K, V>(private val consumer: KafkaConsumer<K, V>) {
3233

3334
private var messagePredicates = mutableListOf<Predicate<ConsumerRecord<K, V>>>()
3435

35-
fun assertMessageValue(assertion: (V) -> Unit): TopicVerifier<K, V> {
36-
@Suppress("DEPRECATION")
37-
return expectMessageValueMatching { value ->
36+
fun assertMessage(assertion: (ConsumerRecord<K, V>) -> Unit): TopicVerifier<K, V> {
37+
messagePredicates.add { record ->
3838
try {
39-
assertion(value)
39+
assertion(record)
4040
true
4141
} catch (e: java.lang.AssertionError) {
4242
log.debug("Assertion has failed", e)
4343
false
4444
}
4545
}
46+
return this
47+
}
48+
49+
fun assertMessageValue(assertion: (V) -> Unit): TopicVerifier<K, V> {
50+
return assertMessage { msg -> assertion(msg.value()) }
4651
}
4752

4853
fun assertMessageKey(assertion: (K) -> Unit): TopicVerifier<K, V> {
49-
messagePredicates.add { record ->
50-
try {
51-
assertion(record.key())
52-
true
53-
} catch (e: java.lang.AssertionError) {
54-
log.debug("Assertion has failed", e)
55-
false
56-
}
57-
}
58-
return this
54+
return assertMessage { msg -> assertion(msg.key()) }
5955
}
6056

6157
fun assertNoMessageKey(): TopicVerifier<K, V> {
62-
messagePredicates.add { record -> record.key() == null }
63-
return this
58+
return assertMessage { msg -> msg.key().shouldBeNull() }
6459
}
6560

6661
@Deprecated(message = "redundant API", replaceWith = ReplaceWith("assertMessageValue"))

0 commit comments

Comments
 (0)