Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<artifactId>kafka-clients</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.kotest</groupId>
<artifactId>kotest-assertions-core-jvm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import org.apache.kafka.common.config.AbstractConfig
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigValue
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.connect.errors.ConnectException
import org.neo4j.connectors.kafka.configuration.helpers.ConfigUtils
import org.neo4j.connectors.kafka.configuration.helpers.Validators.validateNonEmptyIfVisible
import org.neo4j.connectors.kafka.configuration.helpers.parseSimpleString
import org.neo4j.driver.AccessMode
import org.neo4j.driver.AuthToken
Expand Down Expand Up @@ -279,34 +278,21 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
}

/** Perform validation on dependent configuration items */
fun validate(config: org.apache.kafka.common.config.Config) {
val values = config.configValues()

validateNonEmptyIfVisible(values, AUTHENTICATION_BASIC_USERNAME)
validateNonEmptyIfVisible(values, AUTHENTICATION_BASIC_PASSWORD)
validateNonEmptyIfVisible(values, AUTHENTICATION_KERBEROS_TICKET)
validateNonEmptyIfVisible(values, AUTHENTICATION_BEARER_TOKEN)
validateNonEmptyIfVisible(values, AUTHENTICATION_CUSTOM_PRINCIPAL)
validateNonEmptyIfVisible(values, AUTHENTICATION_CUSTOM_CREDENTIALS)
validateNonEmptyIfVisible(values, AUTHENTICATION_CUSTOM_SCHEME)
}

protected fun validateNonEmptyIfVisible(values: MutableList<ConfigValue>, name: String) {
values
.first { it.name() == name }
.run {
if (this.visible() &&
(when (val value = this.value()) {
is String? -> value
is Password? -> value?.value()
else ->
throw IllegalArgumentException(
"unexpected value '$value' for configuration $name")
})
.isNullOrBlank()) {
this.addErrorMessage("Must be non-empty.")
}
}
fun validate(config: org.apache.kafka.common.config.Config, originals: Map<String, String>) {
// authentication configuration
config.validateNonEmptyIfVisible(AUTHENTICATION_BASIC_USERNAME)
config.validateNonEmptyIfVisible(AUTHENTICATION_BASIC_PASSWORD)
config.validateNonEmptyIfVisible(AUTHENTICATION_KERBEROS_TICKET)
config.validateNonEmptyIfVisible(AUTHENTICATION_BEARER_TOKEN)
config.validateNonEmptyIfVisible(AUTHENTICATION_CUSTOM_PRINCIPAL)
config.validateNonEmptyIfVisible(AUTHENTICATION_CUSTOM_CREDENTIALS)
config.validateNonEmptyIfVisible(AUTHENTICATION_CUSTOM_SCHEME)

// security configuration
config.validateNonEmptyIfVisible(SECURITY_ENCRYPTED)
config.validateNonEmptyIfVisible(SECURITY_HOST_NAME_VERIFICATION_ENABLED)
config.validateNonEmptyIfVisible(SECURITY_TRUST_STRATEGY)
config.validateNonEmptyIfVisible(SECURITY_CERT_FILES)
}

fun config(): ConfigDef =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ fun ConfigDef.defineConnectionSettings(): ConfigDef =
group = CONNECTION.title
importance = Importance.HIGH
defaultValue = ""
dependents = listOf(Neo4jConfiguration.AUTHENTICATION_TYPE)
recommender =
Recommenders.visibleIf(
Neo4jConfiguration.AUTHENTICATION_TYPE,
Expand All @@ -92,6 +93,7 @@ fun ConfigDef.defineConnectionSettings(): ConfigDef =
group = CONNECTION.title
importance = Importance.HIGH
defaultValue = ""
dependents = listOf(Neo4jConfiguration.AUTHENTICATION_TYPE)
recommender =
Recommenders.visibleIf(
Neo4jConfiguration.AUTHENTICATION_TYPE,
Expand All @@ -106,6 +108,7 @@ fun ConfigDef.defineConnectionSettings(): ConfigDef =
group = CONNECTION.title
importance = Importance.HIGH
defaultValue = ""
dependents = listOf(Neo4jConfiguration.AUTHENTICATION_TYPE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[maybe for another PR]
while I understand this translates 1:1 with Kafka's org.apache.kafka.common.config.ConfigDef.ConfigKey, it feels to me dependents kinda overlaps with recommender.
Since we own the builder, is there a way to automatically extract dependents from the configured recommenders?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, good idea - I also thought of moving the PropertiesUtil invocations into the builder too - will do it separately.

recommender =
Recommenders.visibleIf(
Neo4jConfiguration.AUTHENTICATION_TYPE,
Expand All @@ -120,6 +123,7 @@ fun ConfigDef.defineConnectionSettings(): ConfigDef =
group = CONNECTION.title
importance = Importance.HIGH
defaultValue = ""
dependents = listOf(Neo4jConfiguration.AUTHENTICATION_TYPE)
recommender =
Recommenders.visibleIf(
Neo4jConfiguration.AUTHENTICATION_TYPE,
Expand All @@ -134,6 +138,7 @@ fun ConfigDef.defineConnectionSettings(): ConfigDef =
group = CONNECTION.title
importance = Importance.HIGH
defaultValue = ""
dependents = listOf(Neo4jConfiguration.AUTHENTICATION_TYPE)
recommender =
Recommenders.visibleIf(
Neo4jConfiguration.AUTHENTICATION_TYPE,
Expand All @@ -148,6 +153,7 @@ fun ConfigDef.defineConnectionSettings(): ConfigDef =
group = CONNECTION.title
importance = Importance.HIGH
defaultValue = ""
dependents = listOf(Neo4jConfiguration.AUTHENTICATION_TYPE)
recommender =
Recommenders.visibleIf(
Neo4jConfiguration.AUTHENTICATION_TYPE,
Expand All @@ -163,6 +169,7 @@ fun ConfigDef.defineConnectionSettings(): ConfigDef =
group = CONNECTION.title
importance = Importance.HIGH
defaultValue = ""
dependents = listOf(Neo4jConfiguration.AUTHENTICATION_TYPE)
recommender =
Recommenders.visibleIf(
Neo4jConfiguration.AUTHENTICATION_TYPE,
Expand All @@ -177,6 +184,7 @@ fun ConfigDef.defineConnectionSettings(): ConfigDef =
group = CONNECTION.title
importance = Importance.HIGH
defaultValue = ""
dependents = listOf(Neo4jConfiguration.AUTHENTICATION_TYPE)
recommender =
Recommenders.visibleIf(
Neo4jConfiguration.AUTHENTICATION_TYPE,
Expand All @@ -191,6 +199,7 @@ fun ConfigDef.defineConnectionSettings(): ConfigDef =
group = CONNECTION.title
importance = Importance.HIGH
defaultValue = ""
dependents = listOf(Neo4jConfiguration.AUTHENTICATION_TYPE)
recommender =
Recommenders.visibleIf(
Neo4jConfiguration.AUTHENTICATION_TYPE,
Expand Down Expand Up @@ -226,8 +235,8 @@ fun ConfigDef.defineEncryptionSettings(): ConfigDef =
documentation = PropertiesUtil.getProperty(Neo4jConfiguration.SECURITY_TRUST_STRATEGY)
group = ADVANCED.title
importance = Importance.LOW
dependents = listOf(Neo4jConfiguration.URI, Neo4jConfiguration.SECURITY_ENCRYPTED)
validator = Validators.enum(Strategy::class.java)
dependents = listOf(Neo4jConfiguration.URI, Neo4jConfiguration.SECURITY_ENCRYPTED)
recommender =
Recommenders.and(
Recommenders.enum(Strategy::class.java),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import java.io.File
import java.net.URI
import java.net.URISyntaxException
import java.util.regex.Pattern
import org.apache.kafka.common.config.Config
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.config.types.Password

object Validators {

Expand Down Expand Up @@ -64,6 +66,22 @@ object Validators {
}
}

fun notBlank(): ConfigDef.Validator {
return ConfigDef.Validator { name, value ->
if (value is String) {
if (value.isEmpty()) {
throw ConfigException(name, value, "Must not be blank.")
}
} else if (value is List<*>) {
if (value.isEmpty()) {
throw ConfigException(name, value, "Must not be empty.")
}
} else {
throw ConfigException(name, value, "Must be a String or a List.")
}
}
}

fun string(vararg values: String): ConfigDef.Validator {
return object : ConfigDef.Validator {
override fun ensureValid(name: String?, value: Any?) {
Expand Down Expand Up @@ -114,7 +132,7 @@ object Validators {
when (value) {
is String -> {
if (value.isBlank()) {
throw ConfigException(name, value, "Must be non-empty.")
throw ConfigException(name, value, "Must not be blank.")
}

try {
Expand All @@ -130,7 +148,7 @@ object Validators {
}
is List<*> -> {
if (value.isEmpty()) {
throw ConfigException(name, value, "Must be non-empty.")
throw ConfigException(name, value, "Must not be blank.")
}

value.forEach { ensureValid(name, it) }
Expand All @@ -148,7 +166,7 @@ object Validators {
override fun ensureValid(name: String?, value: Any?) {
if (value is String) {
if (value.isBlank()) {
throw ConfigException(name, value, "Must be non-empty.")
throw ConfigException(name, value, "Must not be blank.")
}

val file = File(value)
Expand All @@ -166,7 +184,7 @@ object Validators {
}
} else if (value is List<*>) {
if (value.isEmpty()) {
throw ConfigException(name, value, "Must be non-empty.")
throw ConfigException(name, value, "Must not be blank.")
}

value.forEach { ensureValid(name, it) }
Expand All @@ -176,4 +194,24 @@ object Validators {
}
}
}

fun Config.validateNonEmptyIfVisible(name: String) {
this.configValues()
.first { it.name() == name }
.let { config ->
if (config.visible() &&
(when (val value = config.value()) {
is Int? -> value == null
is Boolean? -> value == null
is String? -> value.isNullOrEmpty()
is Password? -> value?.value().isNullOrEmpty()
is List<*>? -> value.isEmpty()
else ->
throw IllegalArgumentException(
"unexpected value '$value' for configuration $name")
})) {
config.addErrorMessage("Invalid value for configuration $name: Must not be blank.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
# limitations under the License.
##
## Connection Properties
neo4j.stream-from=Type: Enum<ALL, NOW, LAST_COMMITTED>;\nDescription: A time anchor to start streaming from.
neo4j.start-from=Type: Enum<EARLIEST, NOW, USER_PROVIDED>;\nDescription: A time anchor to start streaming from.
neo4j.start-from.value=Type: STRING|LONG;\nDescription: Custom value to use as a starting offset. Used once during the initial run of the connector, and will be ignored if there is an offset stored in Kafka Connect.
neo4j.source-strategy=Type: Enum<QUERY>;\nDescription: Source strategy for this connector.
neo4j.query=Type: String;\nDescription: Cypher query to gather changes. Requires both `neo4j.query.streaming-property` to be in the result set, and `$lastCheck` query parameter for tracking changes.
neo4j.query.streaming-property=Type: String;\nDescription: Property name that is both present in the result set of the specified query and used as a filter to query changes from a previous value.
neo4j.query.poll-interval=Type: String;\nDescription: Interval in which the query is executed.
topic=Type: String;\nDescription: Kafka topic to push gathered change messages.
neo4j.enforce-schema=Type: Boolean;\nDescription: Whether to attach schema to produced change messages.
neo4j.query.batch-size=Type: Integer;\nDescription: Max number of messages pushed for each poll cycle.
neo4j.batch-size=Type: Integer;\nDescription: Max number of messages pushed for each poll cycle.
neo4j.query.timeout=Type: Duration;\nDescription: Maximum amount of time source query is allowed to run.
neo4j.cdc.poll-interval=Type: Duration;\nDescription: The interval at which the database will be queried for change data.
neo4j.cdc.poll-duration=Type: Duration;\nDescription: The maximum duration a poll request will wait for a change to be received from the database.
neo4j.ignore-stored-offset=Type: Boolean;\nDescription: Whether to ignore any offset value retrieved from the offset storage saved by a previous run.
Loading