Skip to content

Commit ab110fd

Browse files
authored
feat: provide additional pattern config mechanism (#28)
1 parent eb8ad95 commit ab110fd

File tree

8 files changed

+495
-68
lines changed

8 files changed

+495
-68
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ object ChangeEventExtensions {
6868
.field("captureMode", SimpleTypes.STRING.schema())
6969
.field("txStartTime", SimpleTypes.ZONEDDATETIME.schema())
7070
.field("txCommitTime", SimpleTypes.ZONEDDATETIME.schema())
71+
.field("txMetadata", DynamicTypes.schemaFor(this.txMetadata, true).schema())
7172
.also {
7273
this.additionalEntries.forEach { entry ->
7374
it.field(entry.key, DynamicTypes.schemaFor(entry.value, true))
@@ -86,6 +87,9 @@ object ChangeEventExtensions {
8687
it.put("captureMode", this.captureMode.name)
8788
it.put("txStartTime", DateTimeFormatter.ISO_DATE_TIME.format(this.txStartTime))
8889
it.put("txCommitTime", DateTimeFormatter.ISO_DATE_TIME.format(this.txCommitTime))
90+
it.put(
91+
"txMetadata",
92+
DynamicTypes.valueFor(schema.field("txMetadata").schema(), this.txMetadata))
8993

9094
this.additionalEntries.forEach { entry ->
9195
it.put(entry.key, DynamicTypes.valueFor(schema.field(entry.key).schema(), entry.value))

common/src/test/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensionsTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,8 @@ class ChangeEventExtensionsTest {
802802
"127.0.0.1:7687",
803803
ZonedDateTime.now().minusSeconds(1),
804804
ZonedDateTime.now(),
805-
mapOf("txMetadata" to mapOf("user" to "app_user", "app" to "hr"))),
805+
mapOf("user" to "app_user", "app" to "hr"),
806+
emptyMap()),
806807
event)
807808
val schemaAndValue = change.toConnectValue()
808809

docker/docker-compose.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
services:
33
neo4j:
4-
image: neo4j:5.12-enterprise
4+
image: neo4j:5-enterprise
55
hostname: neo4j
66
container_name: neo4j
77
ports:
@@ -11,7 +11,6 @@ services:
1111
NEO4J_AUTH: neo4j/password
1212
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
1313
NEO4J_server_memory_heap_max__size: "4G"
14-
NEO4J_internal_dbms_change__data__capture: "true"
1514
healthcheck:
1615
test: [ "CMD", "cypher-shell", "-u", "neo4j", "-p", "password", "RETURN 1" ]
1716
start_period: 2m

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<avro.version>1.11.3</avro.version>
4242
<awaitility.version>4.2.0</awaitility.version>
4343
<build-resources.version>1.0.0</build-resources.version>
44-
<cdc.version>1.0.3</cdc.version>
44+
<cdc.version>1.0.4</cdc.version>
4545
<commons-collections4.version>4.4</commons-collections4.version>
4646
<commons-lang3.version>3.13.0</commons-lang3.version>
4747
<hamcrest.version>2.2</hamcrest.version>

source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt

Lines changed: 194 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.kafka.common.config.Config
2424
import org.apache.kafka.common.config.ConfigDef
2525
import org.apache.kafka.common.config.ConfigDef.Range
2626
import org.apache.kafka.common.config.ConfigException
27+
import org.neo4j.cdc.client.model.EntityOperation
2728
import org.neo4j.cdc.client.pattern.Pattern
2829
import org.neo4j.cdc.client.pattern.PatternException
2930
import org.neo4j.cdc.client.selector.EntitySelector
@@ -108,46 +109,204 @@ class SourceConfiguration(originals: Map<*, *>) :
108109
val cdcSelectorsToTopics: Map<Selector, List<String>> by lazy {
109110
when (strategy) {
110111
SourceType.CDC -> {
111-
val map = mutableMapOf<Selector, MutableList<String>>()
112+
val configMap = mutableMapOf<String, MutableList<Pattern>>()
113+
val nonPositionalConfigMode = mutableMapOf<String, Boolean>()
112114

113115
originals()
114116
.entries
115117
.filter { CDC_PATTERNS_REGEX.matches(it.key) }
116118
.flatMap {
117-
Pattern.parse(it.value as String?)
118-
.flatMap { it.toSelector() }
119-
.map { key -> key to CDC_PATTERNS_REGEX.matchEntire(it.key)!!.groupValues[1] }
119+
Pattern.parse(it.value as String?).map { key ->
120+
key to retrieveGroupsFromConfigKey(it.key, CDC_PATTERNS_REGEX).groupValues[1]
121+
}
120122
}
121123
.forEach {
122-
if (!map.containsKey(it.first)) {
123-
map[it.first] = mutableListOf()
124+
if (!configMap.containsKey(it.second)) {
125+
configMap[it.second] = mutableListOf()
124126
}
125127

126-
val list = map[it.first]!!
127-
list.add(it.second)
128-
list.sort()
128+
nonPositionalConfigMode[it.second] = true
129+
130+
val list = configMap[it.second]!!
131+
list.add(it.first)
129132
}
133+
originals()
134+
.entries
135+
.filter { CDC_PATTERN_ARRAY_REGEX.matches(it.key) }
136+
.forEach { mapPositionalPattern(it, nonPositionalConfigMode, configMap) }
130137

131-
map
138+
originals()
139+
.entries
140+
.filter { CDC_PATTERN_ARRAY_OPERATION_REGEX.matches(it.key) }
141+
.forEach { mapOperation(it, nonPositionalConfigMode, configMap) }
142+
originals()
143+
.entries
144+
.filter { CDC_PATTERN_ARRAY_CHANGES_TO_REGEX.matches(it.key) }
145+
.forEach { mapChangesTo(it, nonPositionalConfigMode, configMap) }
146+
originals()
147+
.entries
148+
.filter { CDC_PATTERN_ARRAY_METADATA_REGEX.matches(it.key) }
149+
.forEach { mapMetadata(it, nonPositionalConfigMode, configMap) }
150+
151+
pivotMapCdcSelectorMap(configMap)
132152
}
133153
else -> emptyMap()
134154
}
135155
}
136156

157+
private fun mapPositionalPattern(
158+
configEntry: MutableMap.MutableEntry<String, Any>,
159+
nonPositionalConfigMode: MutableMap<String, Boolean>,
160+
configMap: MutableMap<String, MutableList<Pattern>>
161+
) {
162+
val matchGroups = retrieveGroupsFromConfigKey(configEntry.key, CDC_PATTERN_ARRAY_REGEX)
163+
val topicName = matchGroups.groupValues[1]
164+
if (nonPositionalConfigMode.getOrDefault(topicName, false)) {
165+
throw ConfigException(
166+
"It's not allowed to mix positional and non-positional configuration for the same topic.",
167+
)
168+
}
169+
val index = Integer.parseInt(matchGroups.groupValues[3])
170+
val patterns = Pattern.parse(configEntry.value as String?)
171+
if (index > patterns.size - 1) {
172+
throw ConfigException(
173+
"Index $index out of bounds. Please ensure that you started the definition with a 0-based index.",
174+
)
175+
}
176+
if (patterns.size > 1) {
177+
throw ConfigException(
178+
"Too many patterns. Only one pattern allowed for positional pattern configuration.",
179+
)
180+
}
181+
val pattern = patterns.get(0)
182+
if (!configMap.containsKey(topicName)) {
183+
configMap[topicName] = mutableListOf()
184+
}
185+
val list = configMap[topicName]!!
186+
list.add(pattern)
187+
}
188+
189+
private fun mapOperation(
190+
configEntry: MutableMap.MutableEntry<String, Any>,
191+
nonPositionalConfigMode: MutableMap<String, Boolean>,
192+
configMap: MutableMap<String, MutableList<Pattern>>
193+
) {
194+
val matchGroup = retrieveGroupsFromConfigKey(configEntry.key, CDC_PATTERN_ARRAY_OPERATION_REGEX)
195+
val (index, patterns) = retrieveIndexAndPattern(matchGroup, nonPositionalConfigMode, configMap)
196+
val operation =
197+
when (val value = (configEntry.value as String).lowercase()) {
198+
"create" -> EntityOperation.CREATE
199+
"update" -> EntityOperation.UPDATE
200+
"delete" -> EntityOperation.DELETE
201+
else -> {
202+
throw ConfigException(
203+
"Cannot parse $value as an operation. Allowed operations are create, delete or update.")
204+
}
205+
}
206+
val pattern = patterns.get(index)
207+
pattern.withOperation(operation)
208+
}
209+
210+
private fun mapChangesTo(
211+
configEntry: MutableMap.MutableEntry<String, Any>,
212+
nonPositionalConfigMode: MutableMap<String, Boolean>,
213+
configMap: MutableMap<String, MutableList<Pattern>>
214+
) {
215+
val matchGroup =
216+
retrieveGroupsFromConfigKey(configEntry.key, CDC_PATTERN_ARRAY_CHANGES_TO_REGEX)
217+
val (index, patterns) = retrieveIndexAndPattern(matchGroup, nonPositionalConfigMode, configMap)
218+
val value = configEntry.value as String
219+
val changesTo = value.splitToSequence(",").map { term -> term.trim() }.toSet()
220+
val pattern = patterns.get(index)
221+
pattern.withChangesTo(changesTo)
222+
}
223+
224+
private fun mapMetadata(
225+
configEntry: MutableMap.MutableEntry<String, Any>,
226+
nonPositionalConfigMode: MutableMap<String, Boolean>,
227+
configMap: MutableMap<String, MutableList<Pattern>>
228+
) {
229+
val matchGroup = retrieveGroupsFromConfigKey(configEntry.key, CDC_PATTERN_ARRAY_METADATA_REGEX)
230+
val (index, patterns) = retrieveIndexAndPattern(matchGroup, nonPositionalConfigMode, configMap)
231+
val keyValue = matchGroup.groupValues[5]
232+
var value = configEntry.value
233+
val pattern = patterns.get(index)
234+
if (keyValue.startsWith(EntitySelector.METADATA_KEY_TX_METADATA)) {
235+
value =
236+
mapOf(
237+
keyValue.removePrefix(EntitySelector.METADATA_KEY_TX_METADATA + '.') to value,
238+
)
239+
val metadata =
240+
mapOf(
241+
EntitySelector.METADATA_KEY_TX_METADATA to value,
242+
)
243+
pattern.withMetadata(metadata)
244+
} else {
245+
val metadata =
246+
mapOf(
247+
keyValue to value,
248+
)
249+
pattern.withMetadata(metadata)
250+
}
251+
}
252+
253+
private fun retrieveIndexAndPattern(
254+
matchGroup: MatchResult,
255+
nonPositionalConfigMode: MutableMap<String, Boolean>,
256+
configMap: MutableMap<String, MutableList<Pattern>>
257+
): Pair<Int, MutableList<Pattern>> {
258+
val topicName = matchGroup.groupValues[1]
259+
if (nonPositionalConfigMode.getOrDefault(topicName, false)) {
260+
throw ConfigException(
261+
"It's not allowed to mix positional and non-positional configuration for the same topic.",
262+
)
263+
}
264+
val index = Integer.parseInt(matchGroup.groupValues[3])
265+
if (!configMap.containsKey(topicName)) {
266+
throw ConfigException(
267+
"Cannot assign config value because pattern is not defined for index $index.",
268+
)
269+
}
270+
val patterns = configMap.get(topicName)!!
271+
if (index > patterns.size - 1) {
272+
throw ConfigException(
273+
"Index $index out of bounds. Please ensure that you started the definition with a 0-based index.",
274+
)
275+
}
276+
return Pair(index, patterns)
277+
}
278+
279+
private fun retrieveGroupsFromConfigKey(configKey: String, regex: Regex) =
280+
regex.matchEntire(configKey)!!
281+
282+
private fun pivotMapCdcSelectorMap(
283+
patternMap: Map<String, List<Pattern>>
284+
): Map<Selector, List<String>> {
285+
val selectorBasedMap = mutableMapOf<Selector, MutableList<String>>()
286+
patternMap.entries.forEach {
287+
for (pattern in it.value) {
288+
for (selector in pattern.toSelector()) {
289+
if (!selectorBasedMap.containsKey(selector)) {
290+
selectorBasedMap[selector] = mutableListOf()
291+
}
292+
selectorBasedMap[selector]!!.add(it.key)
293+
selectorBasedMap[selector]!!.sort()
294+
}
295+
}
296+
}
297+
298+
return selectorBasedMap
299+
}
300+
137301
val cdcSelectors: Set<Selector> by lazy {
138302
cdcSelectorsToTopics.keys
139303
.map {
140304
when (it) {
141-
is NodeSelector ->
142-
NodeSelector(
143-
it.change,
144-
it.changesTo,
145-
it.labels,
146-
it.key,
147-
)
305+
is NodeSelector -> NodeSelector(it.change, it.changesTo, it.labels, it.key, it.metadata)
148306
is RelationshipSelector ->
149-
RelationshipSelector(it.change, it.changesTo, it.type, it.start, it.end, it.key)
150-
is EntitySelector -> EntitySelector(it.change, it.changesTo)
307+
RelationshipSelector(
308+
it.change, it.changesTo, it.type, it.start, it.end, it.key, it.metadata)
309+
is EntitySelector -> EntitySelector(it.change, it.changesTo, it.metadata)
151310
else -> throw IllegalStateException("unexpected pattern type ${it.javaClass.name}")
152311
}
153312
}
@@ -199,8 +358,16 @@ class SourceConfiguration(originals: Map<*, *>) :
199358
const val ENFORCE_SCHEMA = "neo4j.enforce-schema"
200359
const val CDC_POLL_INTERVAL = "neo4j.cdc.poll-interval"
201360
const val CDC_POLL_DURATION = "neo4j.cdc.poll-duration"
202-
private val CDC_PATTERNS_REGEX =
203-
Regex("^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)?$")
361+
private val CDC_PATTERNS_REGEX = Regex("^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)$")
362+
private val CDC_PATTERN_ARRAY_REGEX =
363+
Regex("^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)\\.([0-9]+)(\\.pattern)$")
364+
private val CDC_PATTERN_ARRAY_OPERATION_REGEX =
365+
Regex("^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)\\.([0-9]+)(\\.operation)$")
366+
private val CDC_PATTERN_ARRAY_CHANGES_TO_REGEX =
367+
Regex("^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)\\.([0-9]+)(\\.changesTo)$")
368+
private val CDC_PATTERN_ARRAY_METADATA_REGEX =
369+
Regex(
370+
"^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)\\.([0-9]+)(\\.metadata)\\.([a-zA-Z0-9._-]+)$")
204371

205372
private val DEFAULT_POLL_INTERVAL = 10.seconds
206373
private const val DEFAULT_BATCH_SIZE = 1000
@@ -266,7 +433,12 @@ class SourceConfiguration(originals: Map<*, *>) :
266433
val configList = config.configValues().toList()
267434
val strategy = configList.find { it.name() == STRATEGY }
268435
if (strategy?.value() == SourceType.CDC.name) {
269-
val cdcTopics = originals.entries.filter { CDC_PATTERNS_REGEX.matches(it.key) }
436+
val cdcTopics =
437+
originals.entries.filter { CDC_PATTERNS_REGEX.matches(it.key) } +
438+
originals.entries.filter { CDC_PATTERN_ARRAY_REGEX.matches(it.key) } +
439+
originals.entries.filter { CDC_PATTERN_ARRAY_OPERATION_REGEX.matches(it.key) } +
440+
originals.entries.filter { CDC_PATTERN_ARRAY_CHANGES_TO_REGEX.matches(it.key) } +
441+
originals.entries.filter { CDC_PATTERN_ARRAY_METADATA_REGEX.matches(it.key) }
270442
if (cdcTopics.isEmpty()) {
271443
strategy.addErrorMessage(
272444
"At least one topic needs to be configured with pattern(s) describing the entities to query changes for. Please refer to documentation for more information.")

0 commit comments

Comments
 (0)