Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object StreamsTransactionEventExtensions {
this.meta.username,
this.meta.username,
"unknown",
"unknown",
CaptureMode.OFF,
"unknown",
"unknown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ class ChangeEventExtensionsTest {
"service",
"neo4j",
"server-1",
"neo4j",
CaptureMode.DIFF,
"bolt",
"127.0.0.1:32000",
Expand Down Expand Up @@ -1291,6 +1292,7 @@ class ChangeEventExtensionsTest {
"service",
"neo4j",
"server-1",
"neo4j",
CaptureMode.DIFF,
"bolt",
"127.0.0.1:32000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ class StreamsTransactionEventExtensionsTest {
"user",
"user",
"unknown",
"unknown",
CaptureMode.OFF,
"unknown",
"unknown",
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<awaitility.version>4.2.1</awaitility.version>
<build-resources.version>2024-07.1</build-resources.version>
<byte-buddy.version>1.14.17</byte-buddy.version>
<cdc.version>1.0.6</cdc.version>
<cdc.version>1.0.7</cdc.version>
<commons-collections4.version>4.4</commons-collections4.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<hamcrest.version>2.2</hamcrest.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ abstract class Neo4jCdcSchemaIT {
"neo4j",
"neo4j",
"server-id",
"neo4j",
CaptureMode.DIFF,
"bolt",
"localhost:32000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ abstract class Neo4jCdcSourceIdIT {
"neo4j",
"neo4j",
"server-id",
"neo4j",
CaptureMode.DIFF,
"bolt",
"localhost:32000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ abstract class Neo4jSinkErrorIT {
"neo4j",
"neo4j",
"server-id",
"neo4j",
CaptureMode.DIFF,
"bolt",
"localhost:32000",
Expand Down
2 changes: 1 addition & 1 deletion sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<name>sink</name>
<description>Neo4j Connector for Kafka - Sink</description>
<properties>
<antlr4.version>4.13.1</antlr4.version>
<antlr4.version>4.13.2</antlr4.version>
<json-schema-validator.version>1.5.0</json-schema-validator.version>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ object TestUtils {
"service",
"neo4j",
"server-1",
"neo4j",
CaptureMode.DIFF,
"bolt",
"127.0.0.1:32000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Neo4jCdcTask : SourceTask() {
CDCClient(
config.driver,
{ sessionConfig },
{ config.txConfig() },
config.cdcPollingInterval.toJavaDuration(),
*config.cdcSelectors.toTypedArray())
log.debug("constructed cdc client")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class SourceConfiguration(originals: Map<*, *>) :
SourceType.CDC -> {
val configMap = mutableMapOf<String, MutableList<Pattern>>()
val nonPositionalConfigMode = mutableMapOf<String, Boolean>()
val patternTxMetadataMap = mutableMapOf<Pattern, MutableMap<String, Any>>()

originals()
.entries
Expand Down Expand Up @@ -151,7 +152,7 @@ class SourceConfiguration(originals: Map<*, *>) :
.entries
.filter { CDC_PATTERN_ARRAY_METADATA_REGEX.matches(it.key) }
.map { CdcPatternConfigItem(it, CDC_PATTERN_ARRAY_METADATA_REGEX) }
.forEach { mapMetadata(it, nonPositionalConfigMode, configMap) }
.forEach { mapMetadata(it, nonPositionalConfigMode, configMap, patternTxMetadataMap) }

pivotMapCdcSelectorMap(configMap)
}
Expand Down Expand Up @@ -253,28 +254,34 @@ class SourceConfiguration(originals: Map<*, *>) :
private fun mapMetadata(
configEntry: CdcPatternConfigItem,
nonPositionalConfigMode: MutableMap<String, Boolean>,
configMap: MutableMap<String, MutableList<Pattern>>
configMap: MutableMap<String, MutableList<Pattern>>,
patternTxMetadataMap: MutableMap<Pattern, MutableMap<String, Any>>
) {
val (index, patterns) = retrieveIndexAndPattern(configEntry, nonPositionalConfigMode, configMap)
val keyValue = configEntry.metadata!!
var value = configEntry.value
val metadataKey = configEntry.metadata!!
val value = configEntry.value
val pattern = patterns[index]
if (keyValue.startsWith(EntitySelector.METADATA_KEY_TX_METADATA + '.')) {
value =
mapOf(
keyValue.removePrefix(EntitySelector.METADATA_KEY_TX_METADATA + '.') to value,
)
val metadata =
mapOf(
EntitySelector.METADATA_KEY_TX_METADATA to value,
)
pattern.withMetadata(metadata)
if (metadataKey.startsWith("$METADATA_KEY_TX_METADATA.")) {
val txMetadataKey = metadataKey.removePrefix("$METADATA_KEY_TX_METADATA.")

var txMetadata = patternTxMetadataMap[pattern]
if (txMetadata == null) {
txMetadata = mutableMapOf(txMetadataKey to value)
} else {
txMetadata[txMetadataKey] = value
}

pattern.withTxMetadata(txMetadata)
patternTxMetadataMap[pattern] = txMetadata
} else if (metadataKey == METADATA_KEY_EXECUTING_USER) {
pattern.withExecutingUser(value as String)
} else if (metadataKey == METADATA_KEY_AUTHENTICATED_USER) {
pattern.withAuthenticatedUser(value as String)
} else {
val metadata =
mapOf(
keyValue to value,
)
pattern.withMetadata(metadata)
throw ConfigException(
"Unexpected metadata key: '$metadataKey' found in configuration property '${configEntry.key}'. " +
"Valid keys are '$METADATA_KEY_AUTHENTICATED_USER', '$METADATA_KEY_EXECUTING_USER', " +
"or keys starting with '$METADATA_KEY_TX_METADATA.*'.")
}
}

Expand Down Expand Up @@ -345,11 +352,36 @@ class SourceConfiguration(originals: Map<*, *>) :
cdcSelectorsToTopics.keys
.map {
when (it) {
is NodeSelector -> NodeSelector(it.change, it.changesTo, it.labels, it.key, it.metadata)
is NodeSelector ->
NodeSelector.builder()
.withOperation(it.operation)
.withChangesTo(it.changesTo)
.withLabels(it.labels)
.withKey(it.key)
.withTxMetadata(it.txMetadata)
.withExecutingUser(it.executingUser)
.withAuthenticatedUser(it.authenticatedUser)
.build()
is RelationshipSelector ->
RelationshipSelector(
it.change, it.changesTo, it.type, it.start, it.end, it.key, it.metadata)
is EntitySelector -> EntitySelector(it.change, it.changesTo, it.metadata)
RelationshipSelector.builder()
.withOperation(it.operation)
.withChangesTo(it.changesTo)
.withType(it.type)
.withStart(it.start)
.withEnd(it.end)
.withKey(it.key)
.withTxMetadata(it.txMetadata)
.withExecutingUser(it.executingUser)
.withAuthenticatedUser(it.authenticatedUser)
.build()
is EntitySelector ->
EntitySelector.builder()
.withOperation(it.operation)
.withChangesTo(it.changesTo)
.withTxMetadata(it.txMetadata)
.withExecutingUser(it.executingUser)
.withAuthenticatedUser(it.authenticatedUser)
.build()
else -> throw IllegalStateException("unexpected pattern type ${it.javaClass.name}")
}
}
Expand Down Expand Up @@ -459,6 +491,10 @@ class SourceConfiguration(originals: Map<*, *>) :
private val DEFAULT_CDC_POLL_DURATION = 5.seconds
private const val DEFAULT_STREAMING_PROPERTY = "timestamp"

private const val METADATA_KEY_AUTHENTICATED_USER = "authenticatedUser"
private const val METADATA_KEY_EXECUTING_USER = "executingUser"
private const val METADATA_KEY_TX_METADATA = "txMetadata"

fun validate(config: Config, originals: Map<String, String>) {
validate(config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ object TestData {
"authenticated-user",
"executing-user",
"server-id",
"neo4j",
CaptureMode.FULL,
"connection-type",
"connection-client",
Expand Down
Loading