1616 */
1717package org.neo4j.connectors.kafka.sink.utils
1818
19- import java.util.Locale
2019import kotlin.reflect.jvm.javaType
2120import org.neo4j.connectors.kafka.service.TopicType
2221import org.neo4j.connectors.kafka.service.TopicTypeGroup
@@ -68,27 +67,12 @@ data class Topics(
6867 TopicType .PATTERN_RELATIONSHIP to relPatternTopics)
6968
7069 companion object {
71- fun from (
72- originalConfig : Map <String , Any ?>,
73- dbName : String = "",
74- invalidTopics : List <String > = emptyList()
75- ): Topics {
76- val config =
77- originalConfig
78- .filterKeys {
79- if (dbName.isNotBlank()) it.lowercase(Locale .ROOT ).endsWith(" .to.$dbName " )
80- else ! it.contains(" .to." )
81- }
82- .mapKeys {
83- if (dbName.isNotBlank()) it.key.replace(" .to.$dbName " , " " , true ) else it.key
84- }
70+ fun from (config : Map <String , Any ?>, invalidTopics : List <String > = emptyList()): Topics {
8571 val cypherTopics = TopicUtils .filterByPrefix(config, SinkConfiguration .CYPHER_TOPIC_PREFIX )
8672 val mergeNodeProperties =
87- originalConfig [SinkConfiguration .PATTERN_NODE_MERGE_PROPERTIES ].toString().toBoolean()
73+ config [SinkConfiguration .PATTERN_NODE_MERGE_PROPERTIES ].toString().toBoolean()
8874 val mergeRelProperties =
89- originalConfig[SinkConfiguration .PATTERN_RELATIONSHIP_MERGE_PROPERTIES ]
90- .toString()
91- .toBoolean()
75+ config[SinkConfiguration .PATTERN_RELATIONSHIP_MERGE_PROPERTIES ].toString().toBoolean()
9276 val nodePatternTopics =
9377 TopicUtils .filterByPrefix(
9478 config, SinkConfiguration .PATTERN_NODE_TOPIC_PREFIX , invalidTopics)
@@ -110,12 +94,12 @@ data class Topics(
11094 TopicUtils .splitTopics(config[SinkConfiguration .CUD_TOPICS ] as ? String , invalidTopics)
11195 val sourceIdStrategyConfig =
11296 SourceIdIngestionStrategyConfig (
113- originalConfig
97+ config
11498 .getOrDefault(
11599 SinkConfiguration .CDC_SOURCE_ID_LABEL_NAME ,
116100 SourceIdIngestionStrategyConfig .DEFAULT .labelName)
117101 .toString(),
118- originalConfig
102+ config
119103 .getOrDefault(
120104 SinkConfiguration .CDC_SOURCE_ID_ID_NAME ,
121105 SourceIdIngestionStrategyConfig .DEFAULT .idName)
0 commit comments