Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
<packaging>jar</packaging>
<name>common</name>
<description>Neo4j Connector for Kafka - Common</description>
<properties>
<maven-resources-plugin.version>3.3.1</maven-resources-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.neo4j.connectors.kafka.configuration

import com.fasterxml.jackson.databind.util.ClassUtil.defaultValue
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.function.Predicate
Expand All @@ -37,17 +36,12 @@ object ConfigGroup {
const val CONNECTION = "Connection"
const val AUTHENTICATION = "Authentication"
const val TOPIC_CYPHER_MAPPING = "Topic Cypher Mapping"
const val ERROR_REPORTING = "Error Reporting"
const val BATCH = "Batch Management"
const val RETRY = "Retry Strategy"
const val DEPRECATED = "Deprecated Properties (please check the documentation)"
}

open class DeprecatedNeo4jConfiguration(
configDef: ConfigDef,
originals: Map<*, *>,
private val type: ConnectorType
) : AbstractConfig(configDef, originals) {
open class DeprecatedNeo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>) :
AbstractConfig(configDef, originals) {

companion object {
@Deprecated("deprecated in favour of ${Neo4jConfiguration.URI}")
Expand Down Expand Up @@ -99,6 +93,7 @@ open class DeprecatedNeo4jConfiguration(
val CONNECTION_MAX_CONNECTION_LIFETIME_MSECS_DEFAULT = Duration.ofMinutes(8).toMillis()
val CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS_DEFAULT = Duration.ofMinutes(2).toMillis()

@Suppress("DEPRECATION")
fun config(): ConfigDef =
ConfigDef()
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import org.apache.kafka.common.config.AbstractConfig
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.errors.ConnectException
import org.neo4j.connectors.kafka.configuration.helpers.ConfigUtils
import org.neo4j.connectors.kafka.configuration.helpers.Validators.validateNonEmptyIfVisible
import org.neo4j.connectors.kafka.configuration.helpers.parseSimpleString
Expand Down Expand Up @@ -108,7 +107,7 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
internal val authenticationToken
get(): AuthToken =
when (ConfigUtils.getEnum<AuthenticationType>(this, AUTHENTICATION_TYPE)) {
null -> throw ConnectException("Configuration '$AUTHENTICATION_TYPE' is not provided")
null -> AuthTokens.none()
AuthenticationType.NONE -> AuthTokens.none()
AuthenticationType.BASIC ->
AuthTokens.basic(
Expand Down Expand Up @@ -241,6 +240,7 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
val migrated = mutableMapOf<String, String>()

oldSettings.forEach {
@Suppress("DEPRECATION")
when (it.key) {
DeprecatedNeo4jConfiguration.SERVER_URI -> migrated[URI] = it.value.toString()
DeprecatedNeo4jConfiguration.CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import org.apache.kafka.common.config.ConfigDef
import org.neo4j.connectors.kafka.utils.PropertiesUtil

object ConfigUtils {
inline fun <reified E : Enum<E>> getEnum(config: AbstractConfig, key: String): E {
return enumValueOf<E>(config.getString(key))
inline fun <reified E : Enum<E>> getEnum(config: AbstractConfig, key: String): E? {
return try {
enumValueOf<E>(config.getString(key))
} catch (e: IllegalArgumentException) {
null
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ abstract class StreamsTransactionEventDeserializer<EVENT, PAYLOAD : Payload> :
private fun convertPoints(recordChange: RecordChange?, points: Set<String>) =
recordChange?.properties?.mapValues {
if (points.contains(it.key)) {
val pointMap = it.value as Map<String, Any>
val pointMap = it.value as Map<*, *>
when (pointMap["crs"]) {
"cartesian" ->
Values.point(
Expand Down Expand Up @@ -301,14 +301,6 @@ object JSONUtils {
return OBJECT_MAPPER.writeValueAsString(any)
}

fun writeValueAsBytes(any: Any): ByteArray {
return OBJECT_MAPPER.writeValueAsBytes(any)
}

inline fun <reified T> readValue(value: ByteArray): T {
return getObjectMapper().readValue(value, T::class.java)
}

inline fun <reified T> readValue(
value: Any,
stringWhenFailure: Boolean = false,
Expand All @@ -325,7 +317,6 @@ object JSONUtils {
val strValue =
when (value) {
is ByteArray -> String(value)
null -> ""
else -> value.toString()
}
strValue.trimStart().let { if (it[0] == '{' || it[0] == '[') throw e else it as T }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("DEPRECATION")

package streams.kafka.connect.sink

import org.apache.kafka.common.config.ConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("DEPRECATION")

package streams.kafka.connect.source

import org.apache.kafka.common.config.ConfigDef
Expand Down
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.3.1</version>
<configuration>
<propertiesEncoding>${project.build.sourceEncoding}</propertiesEncoding>
</configuration>
</plugin>
<plugin>
<groupId>org.jreleaser</groupId>
<artifactId>jreleaser-maven-plugin</artifactId>
Expand Down Expand Up @@ -295,6 +303,7 @@
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<configuration>
<args>-Werror</args>
<jvmTarget>${java.version}</jvmTarget>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package org.neo4j.connectors.kafka.sink

import org.apache.kafka.common.config.ConfigDef
import org.neo4j.connectors.kafka.configuration.ConfigGroup
import org.neo4j.connectors.kafka.configuration.ConnectorType
import org.neo4j.connectors.kafka.configuration.DeprecatedNeo4jConfiguration
import org.neo4j.connectors.kafka.configuration.helpers.ConfigKeyBuilder
import org.neo4j.connectors.kafka.service.sink.strategy.SourceIdIngestionStrategyConfig
import org.neo4j.connectors.kafka.utils.PropertiesUtil

@Deprecated("use org.neo4j.connectors.kafka.sink.SinkConfiguration")
class DeprecatedNeo4jSinkConfiguration(originals: Map<*, *>) :
DeprecatedNeo4jConfiguration(config(), originals, ConnectorType.SINK) {
DeprecatedNeo4jConfiguration(config(), originals) {

companion object {

Expand Down Expand Up @@ -57,10 +56,11 @@ class DeprecatedNeo4jSinkConfiguration(originals: Map<*, *>) :
@Deprecated("deprecated in favour of ${SinkConfiguration.CUD_TOPICS}")
const val TOPIC_CUD = "neo4j.topic.cud"

const val DEFAULT_BATCH_PARALLELIZE = true
const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED = false
const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED = false
private const val DEFAULT_BATCH_PARALLELIZE = true
private const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED = false
private const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED = false

@Suppress("DEPRECATION")
fun config(): ConfigDef =
DeprecatedNeo4jConfiguration.config()
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.neo4j.connectors.kafka.sink

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask
import org.neo4j.connectors.kafka.configuration.helpers.VersionUtil
Expand Down Expand Up @@ -47,6 +49,7 @@ class Neo4jSinkTask : SinkTask() {
log::error)
}

@OptIn(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class)
override fun put(collection: Collection<SinkRecord>) {
if (collection.isEmpty()) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ class SinkConfiguration(originals: Map<*, *>) :
const val PATTERN_RELATIONSHIP_MERGE_PROPERTIES = "neo4j.pattern.relationship.merge-properties"
const val CUD_TOPICS = "neo4j.cud.topics"

const val DEFAULT_BATCH_SIZE = 1000
private const val DEFAULT_BATCH_SIZE = 1000
val DEFAULT_BATCH_TIMEOUT = 0.seconds
const val DEFAULT_BATCH_PARALLELIZE = true
const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES = false
const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES = false
private const val DEFAULT_BATCH_PARALLELIZE = true
private const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES = false
private const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES = false

@Suppress("DEPRECATION")
@JvmStatic
val KEY_REPLACEMENTS =
mapOf(
Expand All @@ -108,8 +109,9 @@ class SinkConfiguration(originals: Map<*, *>) :
DeprecatedNeo4jSinkConfiguration.TOPIC_PATTERN_RELATIONSHIP_PREFIX to
PATTERN_RELATIONSHIP_TOPIC_PREFIX)

@Suppress("DEPRECATION")
fun migrateSettings(oldSettings: Map<String, Any>): Map<String, String> {
val migratedBase = Neo4jConfiguration.migrateSettings(oldSettings, false)
val migratedBase = migrateSettings(oldSettings, false)
val migrated = HashMap<String, String>(migratedBase.size)

migratedBase.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ open class MapValueConverter<T> : ValueConverter<MutableMap<String, T?>>() {
result: MutableMap<String, T?>?,
fieldName: String,
schema: Schema?,
value: MutableMap<Any?, Any?>?
map: MutableMap<Any?, Any?>?
) {
if (value != null) {
val converted = convert(value) as MutableMap<Any?, Any?>
if (map != null) {
val converted = convert(map) as MutableMap<Any?, Any?>
setValue(result, fieldName, converted)
} else {
setNullField(result, fieldName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class SinkConfigurationTest {
assertEquals(setOf("foo", "bar"), config.topics.cudTopics)
}

@Suppress("DEPRECATION")
@Test
fun `migrateSettings should replace deprecated settings with up-to-date equivalent`() {
val originals =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package org.neo4j.connectors.kafka.source

import org.apache.kafka.common.config.ConfigDef
import org.neo4j.connectors.kafka.configuration.ConnectorType
import org.neo4j.connectors.kafka.configuration.DeprecatedNeo4jConfiguration
import org.neo4j.connectors.kafka.configuration.helpers.ConfigKeyBuilder
import org.neo4j.connectors.kafka.configuration.helpers.Recommenders
import org.neo4j.connectors.kafka.configuration.helpers.Validators

@Deprecated("use org.neo4j.connectors.kafka.source.SourceConfiguration")
class DeprecatedNeo4jSourceConfiguration(originals: Map<*, *>) :
DeprecatedNeo4jConfiguration(config(), originals, ConnectorType.SOURCE) {
DeprecatedNeo4jConfiguration(config(), originals) {

enum class StreamingFrom {
ALL,
Expand All @@ -48,6 +47,7 @@ class DeprecatedNeo4jSourceConfiguration(originals: Map<*, *>) :
@Deprecated("deprecated in favour of ${SourceConfiguration.QUERY}")
const val SOURCE_TYPE_QUERY = "neo4j.source.query"

@Suppress("DEPRECATION")
fun config(): ConfigDef =
DeprecatedNeo4jConfiguration.config()
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,9 @@ class SourceConfiguration(originals: Map<*, *>) :
private val DEFAULT_CDC_POLL_INTERVAL = 1.seconds
private val DEFAULT_CDC_POLL_DURATION = 5.seconds

@Suppress("DEPRECATION")
fun migrateSettings(oldSettings: Map<String, Any>): Map<String, String> {
val migrated = Neo4jConfiguration.migrateSettings(oldSettings, true).toMutableMap()
val migrated = migrateSettings(oldSettings, true).toMutableMap()

oldSettings.forEach {
when (it.key) {
Expand Down Expand Up @@ -416,7 +417,7 @@ class SourceConfiguration(originals: Map<*, *>) :
}

fun validate(config: Config, originals: Map<String, String>) {
Neo4jConfiguration.validate(config)
validate(config)

// START_FROM user defined validation
config.validateNonEmptyIfVisible(START_FROM_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.configuration.DeprecatedNeo4jConfiguration

class DeprecatedNeo4jSourceConfigurationTest {

@Suppress("DEPRECATION")
@Test
fun `should not allow cdc as source type`() {
assertFailsWith(ConfigException::class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.ParameterizedTest.DISPLAY_NAME_PLACEHOLDER
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.Mockito.any
import org.mockito.kotlin.any
import org.mockito.kotlin.doAnswer
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.inOrder
Expand Down Expand Up @@ -161,6 +160,7 @@ class Neo4jSourceExtensionTest {

@ParameterizedTest(name = "$DISPLAY_NAME_PLACEHOLDER [{0}]")
@MethodSource("validMethods")
@Suppress("UNUSED_PARAMETER") // Kotlin compiler not smart enough to see name param is used
fun `resolves Session parameter`(name: String, method: KFunction<Unit>) {
val session = mock<Session>()
val driver =
Expand All @@ -181,6 +181,7 @@ class Neo4jSourceExtensionTest {

@ParameterizedTest(name = "$DISPLAY_NAME_PLACEHOLDER [{0}]")
@MethodSource("validMethods")
@Suppress("UNUSED_PARAMETER") // Kotlin compiler not smart enough to see name param is used
fun `resolves consumer parameter`(name: String, method: KFunction<Unit>) {
val consumer = mock<KafkaConsumer<String, GenericRecord>>()
val extension = Neo4jSourceExtension(consumerFactory = { _, _ -> consumer })
Expand Down