Skip to content

Commit cb6a2a4

Browse files
authored
fix: parse indexed patterns with sorted order (#57)
1 parent 3ed32e2 commit cb6a2a4

File tree

2 files changed

+153
-48
lines changed

2 files changed

+153
-48
lines changed

source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt

Lines changed: 75 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -115,40 +115,42 @@ class SourceConfiguration(originals: Map<*, *>) :
115115
originals()
116116
.entries
117117
.filter { CDC_PATTERNS_REGEX.matches(it.key) }
118-
.flatMap {
119-
Pattern.parse(it.value as String?).map { key ->
120-
key to retrieveGroupsFromConfigKey(it.key, CDC_PATTERNS_REGEX).groupValues[1]
121-
}
122-
}
118+
.map { CdcPatternConfigItem(it, CDC_PATTERNS_REGEX) }
119+
.flatMap { Pattern.parse(it.value as String?).map { key -> key to it.topic } }
123120
.forEach {
124121
if (!configMap.containsKey(it.second)) {
125122
configMap[it.second] = mutableListOf()
126123
}
127124

128125
nonPositionalConfigMode[it.second] = true
129126

130-
val list = configMap[it.second]!!
127+
val list = configMap.getValue(it.second)
131128
list.add(it.first)
132129
}
133130

134131
originals()
135132
.entries
136133
.filter { CDC_PATTERN_ARRAY_REGEX.matches(it.key) }
134+
.map { CdcPatternConfigItem(it, CDC_PATTERN_ARRAY_REGEX) }
135+
.sorted()
137136
.forEach { mapPositionalPattern(it, nonPositionalConfigMode, configMap) }
138137

139138
originals()
140139
.entries
141140
.filter { CDC_PATTERN_ARRAY_OPERATION_REGEX.matches(it.key) }
141+
.map { CdcPatternConfigItem(it, CDC_PATTERN_ARRAY_OPERATION_REGEX) }
142142
.forEach { mapOperation(it, nonPositionalConfigMode, configMap) }
143143

144144
originals()
145145
.entries
146146
.filter { CDC_PATTERN_ARRAY_CHANGES_TO_REGEX.matches(it.key) }
147+
.map { CdcPatternConfigItem(it, CDC_PATTERN_ARRAY_CHANGES_TO_REGEX) }
147148
.forEach { mapChangesTo(it, nonPositionalConfigMode, configMap) }
148149

149150
originals()
150151
.entries
151152
.filter { CDC_PATTERN_ARRAY_METADATA_REGEX.matches(it.key) }
153+
.map { CdcPatternConfigItem(it, CDC_PATTERN_ARRAY_METADATA_REGEX) }
152154
.forEach { mapMetadata(it, nonPositionalConfigMode, configMap) }
153155

154156
pivotMapCdcSelectorMap(configMap)
@@ -163,51 +165,52 @@ class SourceConfiguration(originals: Map<*, *>) :
163165
originals()
164166
.entries
165167
.filter { CDC_KEY_STRATEGY_REGEX.matches(it.key) }
168+
.map { CdcPatternConfigItem(it, CDC_KEY_STRATEGY_REGEX) }
166169
.associate { mapKeyStrategy(it) }
167170
}
168171
else -> emptyMap()
169172
}
170173
}
171174

172175
private fun mapPositionalPattern(
173-
configEntry: MutableMap.MutableEntry<String, Any>,
176+
configEntry: CdcPatternConfigItem,
174177
nonPositionalConfigMode: MutableMap<String, Boolean>,
175178
configMap: MutableMap<String, MutableList<Pattern>>
176179
) {
177-
val matchGroups = retrieveGroupsFromConfigKey(configEntry.key, CDC_PATTERN_ARRAY_REGEX)
178-
val topicName = matchGroups.groupValues[1]
180+
val topicName = configEntry.topic
179181
if (nonPositionalConfigMode.getOrDefault(topicName, false)) {
180182
throw ConfigException(
181183
"It's not allowed to mix positional and non-positional configuration for the same topic.",
182184
)
183185
}
184-
val index = Integer.parseInt(matchGroups.groupValues[3])
185186
val patterns = Pattern.parse(configEntry.value as String?)
186-
if (index > patterns.size - 1) {
187-
throw ConfigException(
188-
"Index $index out of bounds. Please ensure that you started the definition with a 0-based index.",
189-
)
190-
}
191187
if (patterns.size > 1) {
192188
throw ConfigException(
193189
"Too many patterns. Only one pattern allowed for positional pattern configuration.",
194190
)
195191
}
196-
val pattern = patterns.get(0)
192+
193+
val index = configEntry.index!!
194+
val pattern = patterns[0]
197195
if (!configMap.containsKey(topicName)) {
198196
configMap[topicName] = mutableListOf()
199197
}
200-
val list = configMap[topicName]!!
198+
199+
val list = configMap.getValue(topicName)
200+
if (index > list.size) {
201+
throw ConfigException(
202+
"Index $index out of bounds. Please ensure that you started the definition with a 0-based index.",
203+
)
204+
}
201205
list.add(pattern)
202206
}
203207

204208
private fun mapOperation(
205-
configEntry: MutableMap.MutableEntry<String, Any>,
209+
configEntry: CdcPatternConfigItem,
206210
nonPositionalConfigMode: MutableMap<String, Boolean>,
207211
configMap: MutableMap<String, MutableList<Pattern>>
208212
) {
209-
val matchGroup = retrieveGroupsFromConfigKey(configEntry.key, CDC_PATTERN_ARRAY_OPERATION_REGEX)
210-
val (index, patterns) = retrieveIndexAndPattern(matchGroup, nonPositionalConfigMode, configMap)
213+
val (index, patterns) = retrieveIndexAndPattern(configEntry, nonPositionalConfigMode, configMap)
211214
val operation =
212215
when (val value = (configEntry.value as String).lowercase()) {
213216
"create" -> EntityOperation.CREATE
@@ -218,34 +221,31 @@ class SourceConfiguration(originals: Map<*, *>) :
218221
"Cannot parse $value as an operation. Allowed operations are create, delete or update.")
219222
}
220223
}
221-
val pattern = patterns.get(index)
224+
val pattern = patterns[index]
222225
pattern.withOperation(operation)
223226
}
224227

225228
private fun mapChangesTo(
226-
configEntry: MutableMap.MutableEntry<String, Any>,
229+
configEntry: CdcPatternConfigItem,
227230
nonPositionalConfigMode: MutableMap<String, Boolean>,
228231
configMap: MutableMap<String, MutableList<Pattern>>
229232
) {
230-
val matchGroup =
231-
retrieveGroupsFromConfigKey(configEntry.key, CDC_PATTERN_ARRAY_CHANGES_TO_REGEX)
232-
val (index, patterns) = retrieveIndexAndPattern(matchGroup, nonPositionalConfigMode, configMap)
233+
val (index, patterns) = retrieveIndexAndPattern(configEntry, nonPositionalConfigMode, configMap)
233234
val value = configEntry.value as String
234235
val changesTo = value.splitToSequence(",").map { term -> term.trim() }.toSet()
235236
val pattern = patterns.get(index)
236237
pattern.withChangesTo(changesTo)
237238
}
238239

239240
private fun mapMetadata(
240-
configEntry: MutableMap.MutableEntry<String, Any>,
241+
configEntry: CdcPatternConfigItem,
241242
nonPositionalConfigMode: MutableMap<String, Boolean>,
242243
configMap: MutableMap<String, MutableList<Pattern>>
243244
) {
244-
val matchGroup = retrieveGroupsFromConfigKey(configEntry.key, CDC_PATTERN_ARRAY_METADATA_REGEX)
245-
val (index, patterns) = retrieveIndexAndPattern(matchGroup, nonPositionalConfigMode, configMap)
246-
val keyValue = matchGroup.groupValues[5]
245+
val (index, patterns) = retrieveIndexAndPattern(configEntry, nonPositionalConfigMode, configMap)
246+
val keyValue = configEntry.metadata!!
247247
var value = configEntry.value
248-
val pattern = patterns.get(index)
248+
val pattern = patterns[index]
249249
if (keyValue.startsWith(EntitySelector.METADATA_KEY_TX_METADATA + '.')) {
250250
value =
251251
mapOf(
@@ -266,32 +266,31 @@ class SourceConfiguration(originals: Map<*, *>) :
266266
}
267267

268268
private fun mapKeyStrategy(
269-
configEntry: MutableMap.MutableEntry<String, Any>
269+
configEntry: CdcPatternConfigItem,
270270
): Pair<String, Neo4jCdcKeyStrategy> {
271-
val matchGroup = retrieveGroupsFromConfigKey(configEntry.key, CDC_KEY_STRATEGY_REGEX)
272-
val topicName = matchGroup.groupValues[1]
271+
val topicName = configEntry.topic
273272
val value = configEntry.value
274273
return topicName to Neo4jCdcKeyStrategy.valueOf(value as String)
275274
}
276275

277276
private fun retrieveIndexAndPattern(
278-
matchGroup: MatchResult,
277+
configEntry: CdcPatternConfigItem,
279278
nonPositionalConfigMode: MutableMap<String, Boolean>,
280279
configMap: MutableMap<String, MutableList<Pattern>>
281280
): Pair<Int, MutableList<Pattern>> {
282-
val topicName = matchGroup.groupValues[1]
281+
val topicName = configEntry.topic
283282
if (nonPositionalConfigMode.getOrDefault(topicName, false)) {
284283
throw ConfigException(
285284
"It's not allowed to mix positional and non-positional configuration for the same topic.",
286285
)
287286
}
288-
val index = Integer.parseInt(matchGroup.groupValues[3])
287+
val index = configEntry.index!!
289288
if (!configMap.containsKey(topicName)) {
290289
throw ConfigException(
291290
"Cannot assign config value because pattern is not defined for index $index.",
292291
)
293292
}
294-
val patterns = configMap.get(topicName)!!
293+
val patterns = configMap.getValue(topicName)
295294
if (index > patterns.size - 1) {
296295
throw ConfigException(
297296
"Index $index out of bounds. Please ensure that you started the definition with a 0-based index.",
@@ -300,9 +299,6 @@ class SourceConfiguration(originals: Map<*, *>) :
300299
return Pair(index, patterns)
301300
}
302301

303-
private fun retrieveGroupsFromConfigKey(configKey: String, regex: Regex) =
304-
regex.matchEntire(configKey)!!
305-
306302
private fun pivotMapCdcSelectorMap(
307303
patternMap: Map<String, List<Pattern>>
308304
): Map<Selector, List<String>> {
@@ -313,8 +309,10 @@ class SourceConfiguration(originals: Map<*, *>) :
313309
if (!selectorBasedMap.containsKey(selector)) {
314310
selectorBasedMap[selector] = mutableListOf()
315311
}
316-
selectorBasedMap[selector]!!.add(it.key)
317-
selectorBasedMap[selector]!!.sort()
312+
313+
val topics = selectorBasedMap.getValue(selector)
314+
topics.add(it.key)
315+
topics.sort()
318316
}
319317
}
320318
}
@@ -368,6 +366,28 @@ class SourceConfiguration(originals: Map<*, *>) :
368366
}
369367
}
370368

369+
data class CdcPatternConfigItem(val entry: Map.Entry<String, Any>, private val pattern: Regex) :
370+
Comparable<CdcPatternConfigItem> {
371+
private val match = pattern.matchEntire(entry.key)!!
372+
373+
val key = entry.key
374+
val value = entry.value
375+
val topic = match.groups[GROUP_NAME_TOPIC]!!.value
376+
377+
val index
378+
get(): Int? = match.groups[GROUP_NAME_INDEX]?.value?.toInt()
379+
380+
val metadata
381+
get(): String? = match.groups[GROUP_NAME_METADATA]?.value
382+
383+
override fun compareTo(other: CdcPatternConfigItem): Int {
384+
return when (val result = this.topic.compareTo(other.topic)) {
385+
0 -> (this.index ?: -1).compareTo(other.index ?: -1)
386+
else -> result
387+
}
388+
}
389+
}
390+
371391
companion object {
372392
const val START_FROM = "neo4j.start-from"
373393
const val START_FROM_VALUE = "neo4j.start-from.value"
@@ -382,20 +402,27 @@ class SourceConfiguration(originals: Map<*, *>) :
382402
const val ENFORCE_SCHEMA = "neo4j.enforce-schema"
383403
const val CDC_POLL_INTERVAL = "neo4j.cdc.poll-interval"
384404
const val CDC_POLL_DURATION = "neo4j.cdc.poll-duration"
385-
private val CDC_PATTERNS_REGEX = Regex("^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)$")
405+
private const val GROUP_NAME_TOPIC = "topic"
406+
private const val GROUP_NAME_INDEX = "index"
407+
private const val GROUP_NAME_METADATA = "metadata"
408+
private val CDC_PATTERNS_REGEX =
409+
Regex("^neo4j\\.cdc\\.topic\\.(?<$GROUP_NAME_TOPIC>[a-zA-Z0-9._-]+)(\\.patterns)$")
386410
private val CDC_KEY_STRATEGY_REGEX =
387411
Regex(
388-
"^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.key-strategy)$",
412+
"^neo4j\\.cdc\\.topic\\.(?<$GROUP_NAME_TOPIC>[a-zA-Z0-9._-]+)(\\.key-strategy)$",
389413
)
390414
private val CDC_PATTERN_ARRAY_REGEX =
391-
Regex("^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)\\.([0-9]+)(\\.pattern)$")
415+
Regex(
416+
"^neo4j\\.cdc\\.topic\\.(?<$GROUP_NAME_TOPIC>[a-zA-Z0-9._-]+)(\\.patterns)\\.(?<$GROUP_NAME_INDEX>[0-9]+)(\\.pattern)$")
392417
private val CDC_PATTERN_ARRAY_OPERATION_REGEX =
393-
Regex("^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)\\.([0-9]+)(\\.operation)$")
418+
Regex(
419+
"^neo4j\\.cdc\\.topic\\.(?<$GROUP_NAME_TOPIC>[a-zA-Z0-9._-]+)(\\.patterns)\\.(?<$GROUP_NAME_INDEX>[0-9]+)(\\.operation)$")
394420
private val CDC_PATTERN_ARRAY_CHANGES_TO_REGEX =
395-
Regex("^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)\\.([0-9]+)(\\.changesTo)$")
421+
Regex(
422+
"^neo4j\\.cdc\\.topic\\.(?<$GROUP_NAME_TOPIC>[a-zA-Z0-9._-]+)(\\.patterns)\\.(?<$GROUP_NAME_INDEX>[0-9]+)(\\.changesTo)$")
396423
private val CDC_PATTERN_ARRAY_METADATA_REGEX =
397424
Regex(
398-
"^neo4j\\.cdc\\.topic\\.([a-zA-Z0-9._-]+)(\\.patterns)\\.([0-9]+)(\\.metadata)\\.([a-zA-Z0-9._-]+)$")
425+
"^neo4j\\.cdc\\.topic\\.(?<$GROUP_NAME_TOPIC>[a-zA-Z0-9._-]+)(\\.patterns)\\.(?<$GROUP_NAME_INDEX>[0-9]+)(\\.metadata)\\.(?<$GROUP_NAME_METADATA>[a-zA-Z0-9._-]+)$")
399426

400427
private val DEFAULT_POLL_INTERVAL = 10.seconds
401428
private const val DEFAULT_BATCH_SIZE = 1000

source/src/test/kotlin/org/neo4j/connectors/kafka/source/SourceConfigurationTest.kt

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.neo4j.connectors.kafka.source
1818

19+
import io.kotest.matchers.collections.shouldHaveSize
1920
import io.kotest.matchers.maps.shouldContainAll
2021
import io.kotest.matchers.maps.shouldContainExactly
2122
import io.kotest.matchers.shouldBe
@@ -557,6 +558,83 @@ class SourceConfigurationTest {
557558
}
558559
}
559560

561+
@Test
562+
fun `should extract selectors correctly with indexed patterns`() {
563+
assertDoesNotThrow {
564+
val configuration =
565+
SourceConfiguration(
566+
mapOf(
567+
"neo4j.uri" to "neo4j://neo4j:7687",
568+
"neo4j.authentication.type" to "BASIC",
569+
"neo4j.authentication.basic.username" to "neo4j",
570+
"neo4j.authentication.basic.password" to "password",
571+
"neo4j.source-strategy" to "CDC",
572+
"neo4j.start-from" to "NOW",
573+
"neo4j.cdc.poll-interval" to "5s",
574+
"neo4j.cdc.topic.my-topic.patterns.0.pattern" to "(:Person)",
575+
"neo4j.cdc.topic.my-topic.patterns.0.operation" to "create",
576+
"neo4j.cdc.topic.my-topic.patterns.0.changesTo" to "name,surname",
577+
"neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser" to "neo4j",
578+
"neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser" to "neo4j",
579+
"neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app" to "sales",
580+
"neo4j.cdc.topic.my-topic.patterns.1.pattern" to "(:Person)-[:KNOWS]->(:Person)",
581+
"neo4j.cdc.topic.my-topic.patterns.1.operation" to "update",
582+
"neo4j.cdc.topic.my-topic.patterns.1.changesTo" to "since",
583+
"neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser" to "neo4j",
584+
"neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser" to "neo4j",
585+
"neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app" to "sales",
586+
))
587+
588+
configuration.validate()
589+
configuration.cdcSelectors shouldHaveSize 2
590+
configuration.cdcSelectors shouldBe
591+
setOf(
592+
NodeSelector(
593+
EntityOperation.CREATE,
594+
setOf("name", "surname"),
595+
setOf("Person"),
596+
emptyMap(),
597+
mapOf(
598+
"authenticatedUser" to "neo4j",
599+
"executingUser" to "neo4j",
600+
"txMetadata" to mapOf("app" to "sales"))),
601+
RelationshipSelector(
602+
EntityOperation.UPDATE,
603+
setOf("since"),
604+
"KNOWS",
605+
RelationshipNodeSelector(setOf("Person"), emptyMap()),
606+
RelationshipNodeSelector(setOf("Person"), emptyMap()),
607+
emptyMap(),
608+
mapOf(
609+
"authenticatedUser" to "neo4j",
610+
"executingUser" to "neo4j",
611+
"txMetadata" to mapOf("app" to "sales"))),
612+
)
613+
configuration.cdcSelectorsToTopics shouldBe
614+
mapOf(
615+
NodeSelector(
616+
EntityOperation.CREATE,
617+
setOf("name", "surname"),
618+
setOf("Person"),
619+
emptyMap(),
620+
mapOf(
621+
"authenticatedUser" to "neo4j",
622+
"executingUser" to "neo4j",
623+
"txMetadata" to mapOf("app" to "sales"))) to listOf("my-topic"),
624+
RelationshipSelector(
625+
EntityOperation.UPDATE,
626+
setOf("since"),
627+
"KNOWS",
628+
RelationshipNodeSelector(setOf("Person"), emptyMap()),
629+
RelationshipNodeSelector(setOf("Person"), emptyMap()),
630+
emptyMap(),
631+
mapOf(
632+
"authenticatedUser" to "neo4j",
633+
"executingUser" to "neo4j",
634+
"txMetadata" to mapOf("app" to "sales"))) to listOf("my-topic"))
635+
}
636+
}
637+
560638
@Test
561639
fun `strictly parses transaction metadata selector`() {
562640
val config =

0 commit comments

Comments
 (0)