Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# limitations under the License.
##
## Connection Properties
neo4j.cdc.sourceId.topics=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `SourceId` strategy
neo4j.cdc.sourceId.labelName=Type: String;\nDescription: The label name attached to the events with the `SourceId` strategy (default SourceEvent)
neo4j.cdc.sourceId.propertyName=Type: String;\nDescription: The id property name attached to the events with the `SourceId` strategy (default sourceId)
neo4j.cdc.source-id.topics=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `SourceId` strategy
neo4j.cdc.source-id.label-name=Type: String;\nDescription: The label name attached to the events with the `SourceId` strategy (default SourceEvent)
neo4j.cdc.source-id.property-name=Type: String;\nDescription: The id property name attached to the events with the `SourceId` strategy (default sourceId)
neo4j.cdc.schema.topics=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `Schema` strategy
neo4j.batch-parallelize=Type: Boolean;\nDescription: If enabled messages are processed concurrently in the sink. \
Non concurrent execution supports in-order processing, e.g. for CDC (default true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ class SinkConfiguration(originals: Map<String, *>) :
const val BATCH_PARALLELIZE = "neo4j.batch-parallelize"

const val CYPHER_TOPIC_PREFIX = "neo4j.cypher.topic."
const val CDC_SOURCE_ID_TOPICS = "neo4j.cdc.sourceId.topics"
const val CDC_SOURCE_ID_LABEL_NAME = "neo4j.cdc.sourceId.labelName"
const val CDC_SOURCE_ID_PROPERTY_NAME = "neo4j.cdc.sourceId.propertyName"
const val CDC_SOURCE_ID_TOPICS = "neo4j.cdc.source-id.topics"
const val CDC_SOURCE_ID_LABEL_NAME = "neo4j.cdc.source-id.label-name"
const val CDC_SOURCE_ID_PROPERTY_NAME = "neo4j.cdc.source-id.property-name"
const val CDC_SCHEMA_TOPICS = "neo4j.cdc.schema.topics"
const val PATTERN_NODE_TOPIC_PREFIX = "neo4j.pattern.node.topic."
const val PATTERN_RELATIONSHIP_TOPIC_PREFIX = "neo4j.pattern.relationship.topic."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ internal class Neo4jSinkExtension(
if (metadata.cdcSourceId.isNotEmpty()) {
val resolved = metadata.cdcSourceId.map { topicRegistry.resolveTopic(it.topic) }
topics.addAll(resolved)
strategies["neo4j.cdc.sourceId.topics"] = resolved.joinToString(",")
strategies["neo4j.cdc.source-id.topics"] = resolved.joinToString(",")

val labelName = metadata.cdcSourceId.first().labelName
if (labelName.isNotBlank()) {
strategies["neo4j.cdc.sourceId.labelName"] = labelName
strategies["neo4j.cdc.source-id.label-name"] = labelName
}
val propertyName = metadata.cdcSourceId.first().propertyName
if (propertyName.isNotBlank()) {
strategies["neo4j.cdc.sourceId.propertyName"] = propertyName
strategies["neo4j.cdc.source-id.property-name"] = propertyName
}
}

Expand Down