Skip to content

Commit e39f237

Browse files
authored
refactor: make sink config consistent with the source (#48)
1 parent d869af0 commit e39f237

File tree

13 files changed

+352
-164
lines changed

13 files changed

+352
-164
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/service/StreamsSinkService.kt

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,20 @@ package org.neo4j.connectors.kafka.service
1818

1919
import org.neo4j.connectors.kafka.service.sink.strategy.IngestionStrategy
2020

21-
const val STREAMS_TOPIC_KEY: String = "streams.sink.topic"
22-
const val STREAMS_TOPIC_CDC_KEY: String = "streams.sink.topic.cdc"
23-
2421
enum class TopicTypeGroup {
2522
CYPHER,
2623
CDC,
2724
PATTERN,
2825
CUD
2926
}
3027

31-
enum class TopicType(val group: TopicTypeGroup, val key: String) {
32-
CDC_SOURCE_ID(group = TopicTypeGroup.CDC, key = "$STREAMS_TOPIC_CDC_KEY.sourceId"),
33-
CYPHER(group = TopicTypeGroup.CYPHER, key = "$STREAMS_TOPIC_KEY.cypher"),
34-
PATTERN_NODE(group = TopicTypeGroup.PATTERN, key = "$STREAMS_TOPIC_KEY.pattern.node"),
35-
PATTERN_RELATIONSHIP(
36-
group = TopicTypeGroup.PATTERN, key = "$STREAMS_TOPIC_KEY.pattern.relationship"),
37-
CDC_SCHEMA(group = TopicTypeGroup.CDC, key = "$STREAMS_TOPIC_CDC_KEY.schema"),
38-
CUD(group = TopicTypeGroup.CUD, key = "$STREAMS_TOPIC_KEY.cud")
28+
enum class TopicType(val group: TopicTypeGroup) {
29+
CDC_SOURCE_ID(group = TopicTypeGroup.CDC),
30+
CYPHER(group = TopicTypeGroup.CYPHER),
31+
PATTERN_NODE(group = TopicTypeGroup.PATTERN),
32+
PATTERN_RELATIONSHIP(group = TopicTypeGroup.PATTERN),
33+
CDC_SCHEMA(group = TopicTypeGroup.CDC),
34+
CUD(group = TopicTypeGroup.CUD)
3935
}
4036

4137
data class StreamsSinkEntity(val key: Any?, val value: Any?)

common/src/main/kotlin/org/neo4j/connectors/kafka/utils/PropertiesUtil.kt

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,36 +24,35 @@ class PropertiesUtil {
2424
companion object {
2525
private val LOGGER = LoggerFactory.getLogger(PropertiesUtil::class.java)
2626
private const val DEFAULT_VERSION = "unknown"
27-
private var properties: Properties? = null
28-
private var VERSION: String? = null
27+
private var properties: Properties = Properties()
28+
private var VERSION: String
2929

3030
init {
31-
properties = Properties()
32-
properties!!.load(
31+
properties.load(
3332
PropertiesUtil::class.java.getResourceAsStream("/neo4j-configuration.properties"))
34-
properties!!.load(
33+
properties.load(
3534
PropertiesUtil::class.java.getResourceAsStream("/neo4j-source-configuration.properties"))
36-
properties!!.load(
35+
properties.load(
3736
PropertiesUtil::class.java.getResourceAsStream("/neo4j-sink-configuration.properties"))
38-
properties!!.load(
37+
properties.load(
3938
PropertiesUtil::class.java.getResourceAsStream("/kafka-connect-version.properties"))
40-
properties!!.load(
39+
properties.load(
4140
PropertiesUtil::class.java.getResourceAsStream("/kafka-connect-neo4j.properties"))
4241
VERSION =
4342
try {
44-
properties!!.getProperty("version", DEFAULT_VERSION).trim()
43+
properties.getProperty("version", DEFAULT_VERSION).trim()
4544
} catch (e: Exception) {
4645
LOGGER.warn("error while loading version:", e)
4746
DEFAULT_VERSION
4847
}
4948
}
5049

5150
fun getVersion(): String {
52-
return VERSION!!
51+
return VERSION
5352
}
5453

5554
fun getProperty(key: String): String {
56-
return properties!!.getProperty(key)
55+
return properties.getProperty(key)
5756
}
5857
}
5958
}

common/src/main/resources/neo4j-sink-configuration.properties

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@
1313
# limitations under the License.
1414
##
1515
## Connection Properties
16-
neo4j.topic.cdc.sourceId=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `SourceId` strategy
17-
neo4j.topic.cdc.sourceId.labelName=Type: String;\nDescription: The label name attached to the events with the `SourceId` strategy (default SourceEvent)
18-
neo4j.topic.cdc.sourceId.idName=Type: String;\nDescription: The id property name attached to the events with the `SourceId` strategy (default sourceId)
19-
neo4j.topic.cdc.schema=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `Schema` strategy
16+
neo4j.cdc.sourceId.topics=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `SourceId` strategy
17+
neo4j.cdc.sourceId.labelName=Type: String;\nDescription: The label name attached to the events with the `SourceId` strategy (default SourceEvent)
18+
neo4j.cdc.sourceId.idName=Type: String;\nDescription: The id property name attached to the events with the `SourceId` strategy (default sourceId)
19+
neo4j.cdc.schema.topics=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `Schema` strategy
2020
neo4j.batch-parallelize=Type: Boolean;\nDescription: If enabled messages are processed concurrently in the sink. \
2121
Non concurrent execution supports in-order processing, e.g. for CDC (default true)
22-
neo4j.topic.cud=Type: String;\nDescription: The topic list (separated by semicolon) that manages CUD events
23-
neo4j.topic.pattern.merge-node-properties=Type: Boolean;\nDescription: If enabled nodes properties will be merged when \
22+
neo4j.cud.topics=Type: String;\nDescription: The topic list (separated by semicolon) that manages CUD events
23+
neo4j.pattern.node.merge-properties=Type: Boolean;\nDescription: If enabled nodes properties will be merged when \
2424
using Sink `Node pattern` strategy (default false). In case of using Sink `Relationship pattern` strategy edge nodes properties will be merged when \
2525
creating relationships (default false)
26-
neo4j.topic.pattern.merge-relationship-properties=Type: Boolean;\nDescription: If enabled relationships properties will be merged when creating relationships \
26+
neo4j.pattern.relationship.merge-properties=Type: Boolean;\nDescription: If enabled relationships properties will be merged when creating relationships \
2727
using Sink `Relationship pattern` strategy (default false)
2828
neo4j.batch-size=Type: Integer;\nDescription: Max number of messages processed per batch.
2929
neo4j.batch-timeout=Type: Duration;\nDescription: Maximum amount of time a batch is allowed to be processed.

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/DeprecatedNeo4jSinkConfiguration.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,28 @@ class DeprecatedNeo4jSinkConfiguration(originals: Map<*, *>) :
3333
@Deprecated("deprecated in favour of ${SinkConfiguration.BATCH_PARALLELIZE}")
3434
const val BATCH_PARALLELIZE = "neo4j.batch.parallelize"
3535

36+
@Deprecated("deprecated in favour of ${SinkConfiguration.CYPHER_TOPIC_PREFIX}")
3637
const val TOPIC_CYPHER_PREFIX = "neo4j.topic.cypher."
38+
@Deprecated("deprecated in favour of ${SinkConfiguration.CDC_SOURCE_ID_TOPICS}")
3739
const val TOPIC_CDC_SOURCE_ID = "neo4j.topic.cdc.sourceId"
40+
@Deprecated("deprecated in favour of ${SinkConfiguration.CDC_SOURCE_ID_LABEL_NAME}")
3841
const val TOPIC_CDC_SOURCE_ID_LABEL_NAME = "neo4j.topic.cdc.sourceId.labelName"
42+
@Deprecated("deprecated in favour of ${SinkConfiguration.CDC_SOURCE_ID_ID_NAME}")
3943
const val TOPIC_CDC_SOURCE_ID_ID_NAME = "neo4j.topic.cdc.sourceId.idName"
44+
@Deprecated("deprecated in favour of ${SinkConfiguration.PATTERN_NODE_TOPIC_PREFIX}")
4045
const val TOPIC_PATTERN_NODE_PREFIX = "neo4j.topic.pattern.node."
46+
@Deprecated("deprecated in favour of ${SinkConfiguration.PATTERN_RELATIONSHIP_TOPIC_PREFIX}")
4147
const val TOPIC_PATTERN_RELATIONSHIP_PREFIX = "neo4j.topic.pattern.relationship."
42-
@Deprecated("deprecated in favour of ${SinkConfiguration.TOPIC_PATTERN_MERGE_NODE_PROPERTIES}")
48+
@Deprecated("deprecated in favour of ${SinkConfiguration.PATTERN_NODE_MERGE_PROPERTIES}")
4349
const val TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED =
4450
"neo4j.topic.pattern.merge.node.properties.enabled"
4551
@Deprecated(
46-
"deprecated in favour of ${SinkConfiguration.TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES}")
52+
"deprecated in favour of ${SinkConfiguration.PATTERN_RELATIONSHIP_MERGE_PROPERTIES}")
4753
const val TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED =
4854
"neo4j.topic.pattern.merge.relationship.properties.enabled"
55+
@Deprecated("deprecated in favour of ${SinkConfiguration.CDC_SCHEMA_TOPICS}")
4956
const val TOPIC_CDC_SCHEMA = "neo4j.topic.cdc.schema"
57+
@Deprecated("deprecated in favour of ${SinkConfiguration.CUD_TOPICS}")
5058
const val TOPIC_CUD = "neo4j.topic.cud"
5159

5260
const val DEFAULT_BATCH_PARALLELIZE = true

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkConfiguration.kt

Lines changed: 70 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class SinkConfiguration(originals: Map<*, *>) :
5151
val batchTimeout
5252
get(): Duration = Duration.parseSimpleString(getString(BATCH_TIMEOUT))
5353

54-
val topics: Topics by lazy { Topics.from(originals(), "streams.sink." to "neo4j.") }
54+
val topics: Topics by lazy { Topics.from(originals()) }
5555

5656
val strategyMap: Map<TopicType, Any> by lazy { TopicUtils.toStrategyMap(topics) }
5757

@@ -83,42 +83,62 @@ class SinkConfiguration(originals: Map<*, *>) :
8383
const val BATCH_TIMEOUT = "neo4j.batch-timeout"
8484
const val BATCH_PARALLELIZE = "neo4j.batch-parallelize"
8585

86-
const val TOPIC_CYPHER_PREFIX = "neo4j.topic.cypher."
87-
const val TOPIC_CDC_SOURCE_ID = "neo4j.topic.cdc.sourceId"
88-
const val TOPIC_CDC_SOURCE_ID_LABEL_NAME = "neo4j.topic.cdc.sourceId.labelName"
89-
const val TOPIC_CDC_SOURCE_ID_ID_NAME = "neo4j.topic.cdc.sourceId.idName"
90-
const val TOPIC_CDC_SCHEMA = "neo4j.topic.cdc.schema"
91-
const val TOPIC_PATTERN_NODE_PREFIX = "neo4j.topic.pattern.node."
92-
const val TOPIC_PATTERN_RELATIONSHIP_PREFIX = "neo4j.topic.pattern.relationship."
93-
const val TOPIC_PATTERN_MERGE_NODE_PROPERTIES = "neo4j.topic.pattern.merge-node-properties"
94-
const val TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES =
95-
"neo4j.topic.pattern.merge-relationship-properties"
96-
const val TOPIC_CUD = "neo4j.topic.cud"
86+
const val CYPHER_TOPIC_PREFIX = "neo4j.cypher.topic."
87+
const val CDC_SOURCE_ID_TOPICS = "neo4j.cdc.sourceId.topics"
88+
const val CDC_SOURCE_ID_LABEL_NAME = "neo4j.cdc.sourceId.labelName"
89+
const val CDC_SOURCE_ID_ID_NAME = "neo4j.cdc.sourceId.idName"
90+
const val CDC_SCHEMA_TOPICS = "neo4j.cdc.schema.topics"
91+
const val PATTERN_NODE_TOPIC_PREFIX = "neo4j.pattern.node.topic."
92+
const val PATTERN_RELATIONSHIP_TOPIC_PREFIX = "neo4j.pattern.relationship.topic."
93+
const val PATTERN_NODE_MERGE_PROPERTIES = "neo4j.pattern.node.merge-properties"
94+
const val PATTERN_RELATIONSHIP_MERGE_PROPERTIES = "neo4j.pattern.relationship.merge-properties"
95+
const val CUD_TOPICS = "neo4j.cud.topics"
9796

9897
const val DEFAULT_BATCH_SIZE = 1000
9998
val DEFAULT_BATCH_TIMEOUT = 0.seconds
10099
const val DEFAULT_BATCH_PARALLELIZE = true
101100
const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES = false
102101
const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES = false
103102

103+
@JvmStatic
104+
val KEY_REPLACEMENTS =
105+
mapOf(
106+
DeprecatedNeo4jSinkConfiguration.TOPIC_CYPHER_PREFIX to CYPHER_TOPIC_PREFIX,
107+
DeprecatedNeo4jSinkConfiguration.TOPIC_PATTERN_NODE_PREFIX to PATTERN_NODE_TOPIC_PREFIX,
108+
DeprecatedNeo4jSinkConfiguration.TOPIC_PATTERN_RELATIONSHIP_PREFIX to
109+
PATTERN_RELATIONSHIP_TOPIC_PREFIX)
110+
104111
fun migrateSettings(oldSettings: Map<String, Any>): Map<String, String> {
105-
val migrated = Neo4jConfiguration.migrateSettings(oldSettings, true).toMutableMap()
112+
val migratedBase = Neo4jConfiguration.migrateSettings(oldSettings, false)
113+
val migrated = HashMap<String, String>(migratedBase.size)
106114

107-
oldSettings.forEach {
115+
migratedBase.forEach {
108116
when (it.key) {
109-
DeprecatedNeo4jConfiguration.BATCH_SIZE -> migrated[BATCH_SIZE] = it.value.toString()
117+
DeprecatedNeo4jConfiguration.BATCH_SIZE -> migrated[BATCH_SIZE] = it.value
110118
DeprecatedNeo4jConfiguration.BATCH_TIMEOUT_MSECS ->
111119
migrated[BATCH_TIMEOUT] = "${it.value}ms"
112120
DeprecatedNeo4jSinkConfiguration.BATCH_PARALLELIZE ->
113-
migrated[BATCH_PARALLELIZE] = it.value.toString()
121+
migrated[BATCH_PARALLELIZE] = it.value
114122
DeprecatedNeo4jSinkConfiguration.TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED ->
115-
migrated[TOPIC_PATTERN_MERGE_NODE_PROPERTIES] = it.value.toString()
123+
migrated[PATTERN_NODE_MERGE_PROPERTIES] = it.value
116124
DeprecatedNeo4jSinkConfiguration.TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED ->
117-
migrated[TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES] = it.value.toString()
118-
else ->
119-
if (!migrated.containsKey(it.key)) {
120-
migrated[it.key] = it.value.toString()
121-
}
125+
migrated[PATTERN_RELATIONSHIP_MERGE_PROPERTIES] = it.value
126+
DeprecatedNeo4jSinkConfiguration.TOPIC_CDC_SOURCE_ID ->
127+
migrated[CDC_SOURCE_ID_TOPICS] = it.value.replaceLegacyDelimiter()
128+
DeprecatedNeo4jSinkConfiguration.TOPIC_CDC_SOURCE_ID_LABEL_NAME ->
129+
migrated[CDC_SOURCE_ID_LABEL_NAME] = it.value
130+
DeprecatedNeo4jSinkConfiguration.TOPIC_CDC_SOURCE_ID_ID_NAME ->
131+
migrated[CDC_SOURCE_ID_ID_NAME] = it.value
132+
DeprecatedNeo4jSinkConfiguration.TOPIC_CDC_SCHEMA ->
133+
migrated[CDC_SCHEMA_TOPICS] = it.value.replaceLegacyDelimiter()
134+
DeprecatedNeo4jSinkConfiguration.TOPIC_CUD ->
135+
migrated[CUD_TOPICS] = it.value.replaceLegacyDelimiter()
136+
else -> {
137+
val migratedKey = replaceLegacyPropertyKeys(it.key)
138+
if (!migrated.containsKey(migratedKey)) {
139+
migrated[migratedKey] = it.value
140+
}
141+
}
122142
}
123143
}
124144

@@ -132,64 +152,62 @@ class SinkConfiguration(originals: Map<*, *>) :
132152
fun config(): ConfigDef =
133153
Neo4jConfiguration.config()
134154
.define(
135-
ConfigKeyBuilder.of(TOPIC_CDC_SOURCE_ID, ConfigDef.Type.LIST) {
155+
ConfigKeyBuilder.of(CDC_SOURCE_ID_TOPICS, ConfigDef.Type.LIST) {
136156
importance = ConfigDef.Importance.HIGH
137157
defaultValue = ""
138158
group = ConfigGroup.TOPIC_CYPHER_MAPPING
139159
})
140160
.define(
141-
ConfigKeyBuilder.of(TOPIC_CDC_SOURCE_ID_LABEL_NAME, ConfigDef.Type.STRING) {
161+
ConfigKeyBuilder.of(CDC_SOURCE_ID_LABEL_NAME, ConfigDef.Type.STRING) {
142162
importance = ConfigDef.Importance.HIGH
143163
defaultValue = SourceIdIngestionStrategyConfig.DEFAULT.labelName
144164
group = ConfigGroup.TOPIC_CYPHER_MAPPING
145165
recommender =
146-
Recommenders.visibleIfNotEmpty(Predicate.isEqual(TOPIC_CDC_SOURCE_ID))
166+
Recommenders.visibleIfNotEmpty(Predicate.isEqual(CDC_SOURCE_ID_TOPICS))
147167
})
148168
.define(
149-
ConfigKeyBuilder.of(TOPIC_CDC_SOURCE_ID_ID_NAME, ConfigDef.Type.STRING) {
169+
ConfigKeyBuilder.of(CDC_SOURCE_ID_ID_NAME, ConfigDef.Type.STRING) {
150170
importance = ConfigDef.Importance.HIGH
151171
defaultValue = SourceIdIngestionStrategyConfig.DEFAULT.idName
152172
group = ConfigGroup.TOPIC_CYPHER_MAPPING
153173
recommender =
154-
Recommenders.visibleIfNotEmpty(Predicate.isEqual(TOPIC_CDC_SOURCE_ID))
174+
Recommenders.visibleIfNotEmpty(Predicate.isEqual(CDC_SOURCE_ID_TOPICS))
155175
})
156176
.define(
157-
ConfigKeyBuilder.of(TOPIC_CDC_SCHEMA, ConfigDef.Type.LIST) {
177+
ConfigKeyBuilder.of(CDC_SCHEMA_TOPICS, ConfigDef.Type.LIST) {
158178
importance = ConfigDef.Importance.HIGH
159179
defaultValue = ""
160180
group = ConfigGroup.TOPIC_CYPHER_MAPPING
161181
})
162182
.define(
163-
ConfigKeyBuilder.of(TOPIC_CUD, ConfigDef.Type.LIST) {
183+
ConfigKeyBuilder.of(CUD_TOPICS, ConfigDef.Type.LIST) {
164184
importance = ConfigDef.Importance.HIGH
165185
defaultValue = ""
166186
group = ConfigGroup.TOPIC_CYPHER_MAPPING
167187
})
168188
.define(
169-
ConfigKeyBuilder.of(TOPIC_PATTERN_MERGE_NODE_PROPERTIES, ConfigDef.Type.BOOLEAN) {
189+
ConfigKeyBuilder.of(PATTERN_NODE_MERGE_PROPERTIES, ConfigDef.Type.BOOLEAN) {
170190
importance = ConfigDef.Importance.MEDIUM
171191
defaultValue = DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES
172192
group = ConfigGroup.TOPIC_CYPHER_MAPPING
173193
recommender =
174194
Recommenders.visibleIfNotEmpty { k ->
175-
k.startsWith(TOPIC_PATTERN_NODE_PREFIX) ||
176-
k.startsWith(TOPIC_PATTERN_RELATIONSHIP_PREFIX)
195+
k.startsWith(PATTERN_NODE_TOPIC_PREFIX) ||
196+
k.startsWith(PATTERN_RELATIONSHIP_TOPIC_PREFIX)
177197
}
178198
})
179199
.define(
180-
ConfigKeyBuilder.of(
181-
TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES, ConfigDef.Type.BOOLEAN) {
182-
documentation =
183-
PropertiesUtil.getProperty(TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES)
184-
importance = ConfigDef.Importance.MEDIUM
185-
defaultValue = DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES
186-
group = ConfigGroup.TOPIC_CYPHER_MAPPING
187-
recommender =
188-
Recommenders.visibleIfNotEmpty { k ->
189-
k.startsWith(TOPIC_PATTERN_NODE_PREFIX) ||
190-
k.startsWith(TOPIC_PATTERN_RELATIONSHIP_PREFIX)
191-
}
192-
})
200+
ConfigKeyBuilder.of(PATTERN_RELATIONSHIP_MERGE_PROPERTIES, ConfigDef.Type.BOOLEAN) {
201+
documentation = PropertiesUtil.getProperty(PATTERN_RELATIONSHIP_MERGE_PROPERTIES)
202+
importance = ConfigDef.Importance.MEDIUM
203+
defaultValue = DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES
204+
group = ConfigGroup.TOPIC_CYPHER_MAPPING
205+
recommender =
206+
Recommenders.visibleIfNotEmpty { k ->
207+
k.startsWith(PATTERN_NODE_TOPIC_PREFIX) ||
208+
k.startsWith(PATTERN_RELATIONSHIP_TOPIC_PREFIX)
209+
}
210+
})
193211
.define(
194212
ConfigKeyBuilder.of(BATCH_SIZE, ConfigDef.Type.INT) {
195213
importance = ConfigDef.Importance.HIGH
@@ -208,5 +226,12 @@ class SinkConfiguration(originals: Map<*, *>) :
208226
defaultValue = DEFAULT_BATCH_PARALLELIZE
209227
group = ConfigGroup.BATCH
210228
})
229+
230+
private fun replaceLegacyPropertyKeys(key: String) =
231+
KEY_REPLACEMENTS.entries.fold(key) { k, replacement ->
232+
k.replace(replacement.key, replacement.value)
233+
}
234+
235+
private fun String.replaceLegacyDelimiter() = this.replace(';', ',')
211236
}
212237
}

0 commit comments

Comments
 (0)