diff --git a/common/pom.xml b/common/pom.xml
index 9ea1ffcf5..1cfe72ca1 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -11,9 +11,6 @@
jar
common
Neo4j Connector for Kafka - Common
-
- 3.3.1
-
com.fasterxml.jackson.module
diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/DeprecatedNeo4jConfiguration.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/DeprecatedNeo4jConfiguration.kt
index 4eef54de2..17530b6c4 100644
--- a/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/DeprecatedNeo4jConfiguration.kt
+++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/DeprecatedNeo4jConfiguration.kt
@@ -16,7 +16,6 @@
*/
package org.neo4j.connectors.kafka.configuration
-import com.fasterxml.jackson.databind.util.ClassUtil.defaultValue
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.function.Predicate
@@ -37,17 +36,12 @@ object ConfigGroup {
const val CONNECTION = "Connection"
const val AUTHENTICATION = "Authentication"
const val TOPIC_CYPHER_MAPPING = "Topic Cypher Mapping"
- const val ERROR_REPORTING = "Error Reporting"
const val BATCH = "Batch Management"
const val RETRY = "Retry Strategy"
- const val DEPRECATED = "Deprecated Properties (please check the documentation)"
}
-open class DeprecatedNeo4jConfiguration(
- configDef: ConfigDef,
- originals: Map<*, *>,
- private val type: ConnectorType
-) : AbstractConfig(configDef, originals) {
+open class DeprecatedNeo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>) :
+ AbstractConfig(configDef, originals) {
companion object {
@Deprecated("deprecated in favour of ${Neo4jConfiguration.URI}")
@@ -99,6 +93,7 @@ open class DeprecatedNeo4jConfiguration(
val CONNECTION_MAX_CONNECTION_LIFETIME_MSECS_DEFAULT = Duration.ofMinutes(8).toMillis()
val CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS_DEFAULT = Duration.ofMinutes(2).toMillis()
+ @Suppress("DEPRECATION")
fun config(): ConfigDef =
ConfigDef()
.define(
diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfiguration.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfiguration.kt
index 465a597a4..450303a6c 100644
--- a/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfiguration.kt
+++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfiguration.kt
@@ -241,6 +241,7 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
val migrated = mutableMapOf()
oldSettings.forEach {
+ @Suppress("DEPRECATION")
when (it.key) {
DeprecatedNeo4jConfiguration.SERVER_URI -> migrated[URI] = it.value.toString()
DeprecatedNeo4jConfiguration.CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS ->
diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/helpers/ConfigUtils.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/helpers/ConfigUtils.kt
index 191a547c7..8c34e1f62 100644
--- a/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/helpers/ConfigUtils.kt
+++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/helpers/ConfigUtils.kt
@@ -21,8 +21,12 @@ import org.apache.kafka.common.config.ConfigDef
import org.neo4j.connectors.kafka.utils.PropertiesUtil
object ConfigUtils {
- inline fun > getEnum(config: AbstractConfig, key: String): E {
- return enumValueOf(config.getString(key))
+ inline fun > getEnum(config: AbstractConfig, key: String): E? {
+ return try {
+ enumValueOf(config.getString(key))
+ } catch (e: IllegalArgumentException) {
+ null
+ }
}
}
diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/utils/JSONUtils.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/utils/JSONUtils.kt
index f9573cff6..1f6829f0f 100644
--- a/common/src/main/kotlin/org/neo4j/connectors/kafka/utils/JSONUtils.kt
+++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/utils/JSONUtils.kt
@@ -237,7 +237,7 @@ abstract class StreamsTransactionEventDeserializer :
private fun convertPoints(recordChange: RecordChange?, points: Set) =
recordChange?.properties?.mapValues {
if (points.contains(it.key)) {
- val pointMap = it.value as Map
+ val pointMap = it.value as Map<*, *>
when (pointMap["crs"]) {
"cartesian" ->
Values.point(
@@ -301,14 +301,6 @@ object JSONUtils {
return OBJECT_MAPPER.writeValueAsString(any)
}
- fun writeValueAsBytes(any: Any): ByteArray {
- return OBJECT_MAPPER.writeValueAsBytes(any)
- }
-
- inline fun readValue(value: ByteArray): T {
- return getObjectMapper().readValue(value, T::class.java)
- }
-
inline fun readValue(
value: Any,
stringWhenFailure: Boolean = false,
@@ -325,7 +317,6 @@ object JSONUtils {
val strValue =
when (value) {
is ByteArray -> String(value)
- null -> ""
else -> value.toString()
}
strValue.trimStart().let { if (it[0] == '{' || it[0] == '[') throw e else it as T }
diff --git a/legacy-connectors/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt b/legacy-connectors/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt
index 2d9e96e66..d03fa22b1 100644
--- a/legacy-connectors/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt
+++ b/legacy-connectors/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt
@@ -14,6 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+@file:Suppress("DEPRECATION")
+
package streams.kafka.connect.sink
import org.apache.kafka.common.config.ConfigDef
diff --git a/legacy-connectors/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt b/legacy-connectors/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt
index efe0e6103..9b9fd673a 100644
--- a/legacy-connectors/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt
+++ b/legacy-connectors/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt
@@ -14,6 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+@file:Suppress("DEPRECATION")
+
package streams.kafka.connect.source
import org.apache.kafka.common.config.ConfigDef
diff --git a/pom.xml b/pom.xml
index a0e8ee017..6785e9659 100644
--- a/pom.xml
+++ b/pom.xml
@@ -245,6 +245,14 @@
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+ 3.3.1
+
+ ${project.build.sourceEncoding}
+
+
org.jreleaser
jreleaser-maven-plugin
@@ -295,6 +303,7 @@
kotlin-maven-plugin
${kotlin.version}
+ -Werror
${java.version}
diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/DeprecatedNeo4jSinkConfiguration.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/DeprecatedNeo4jSinkConfiguration.kt
index 38f30f6ba..f5fae92ea 100644
--- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/DeprecatedNeo4jSinkConfiguration.kt
+++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/DeprecatedNeo4jSinkConfiguration.kt
@@ -18,7 +18,6 @@ package org.neo4j.connectors.kafka.sink
import org.apache.kafka.common.config.ConfigDef
import org.neo4j.connectors.kafka.configuration.ConfigGroup
-import org.neo4j.connectors.kafka.configuration.ConnectorType
import org.neo4j.connectors.kafka.configuration.DeprecatedNeo4jConfiguration
import org.neo4j.connectors.kafka.configuration.helpers.ConfigKeyBuilder
import org.neo4j.connectors.kafka.service.sink.strategy.SourceIdIngestionStrategyConfig
@@ -26,7 +25,7 @@ import org.neo4j.connectors.kafka.utils.PropertiesUtil
@Deprecated("use org.neo4j.connectors.kafka.sink.SinkConfiguration")
class DeprecatedNeo4jSinkConfiguration(originals: Map<*, *>) :
- DeprecatedNeo4jConfiguration(config(), originals, ConnectorType.SINK) {
+ DeprecatedNeo4jConfiguration(config(), originals) {
companion object {
@@ -57,10 +56,11 @@ class DeprecatedNeo4jSinkConfiguration(originals: Map<*, *>) :
@Deprecated("deprecated in favour of ${SinkConfiguration.CUD_TOPICS}")
const val TOPIC_CUD = "neo4j.topic.cud"
- const val DEFAULT_BATCH_PARALLELIZE = true
- const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED = false
- const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED = false
+ private const val DEFAULT_BATCH_PARALLELIZE = true
+ private const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED = false
+ private const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED = false
+ @Suppress("DEPRECATION")
fun config(): ConfigDef =
DeprecatedNeo4jConfiguration.config()
.define(
diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt
index 9756b918d..2bf461e8c 100644
--- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt
+++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt
@@ -16,6 +16,8 @@
*/
package org.neo4j.connectors.kafka.sink
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.ObsoleteCoroutinesApi
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask
import org.neo4j.connectors.kafka.configuration.helpers.VersionUtil
@@ -47,6 +49,7 @@ class Neo4jSinkTask : SinkTask() {
log::error)
}
+ @OptIn(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class)
override fun put(collection: Collection) {
if (collection.isEmpty()) {
return
diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkConfiguration.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkConfiguration.kt
index ea30bbc61..a81e55493 100644
--- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkConfiguration.kt
+++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkConfiguration.kt
@@ -94,12 +94,13 @@ class SinkConfiguration(originals: Map<*, *>) :
const val PATTERN_RELATIONSHIP_MERGE_PROPERTIES = "neo4j.pattern.relationship.merge-properties"
const val CUD_TOPICS = "neo4j.cud.topics"
- const val DEFAULT_BATCH_SIZE = 1000
+ private const val DEFAULT_BATCH_SIZE = 1000
val DEFAULT_BATCH_TIMEOUT = 0.seconds
- const val DEFAULT_BATCH_PARALLELIZE = true
- const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES = false
- const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES = false
+ private const val DEFAULT_BATCH_PARALLELIZE = true
+ private const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES = false
+ private const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES = false
+ @Suppress("DEPRECATION")
@JvmStatic
val KEY_REPLACEMENTS =
mapOf(
@@ -108,8 +109,9 @@ class SinkConfiguration(originals: Map<*, *>) :
DeprecatedNeo4jSinkConfiguration.TOPIC_PATTERN_RELATIONSHIP_PREFIX to
PATTERN_RELATIONSHIP_TOPIC_PREFIX)
+ @Suppress("DEPRECATION")
fun migrateSettings(oldSettings: Map): Map {
- val migratedBase = Neo4jConfiguration.migrateSettings(oldSettings, false)
+ val migratedBase = migrateSettings(oldSettings, false)
val migrated = HashMap(migratedBase.size)
migratedBase.forEach {
diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/converters/MapValueConverter.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/converters/MapValueConverter.kt
index 6bfd6e228..45cc45109 100644
--- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/converters/MapValueConverter.kt
+++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/converters/MapValueConverter.kt
@@ -85,10 +85,10 @@ open class MapValueConverter : ValueConverter>() {
result: MutableMap?,
fieldName: String,
schema: Schema?,
- value: MutableMap?
+ map: MutableMap?
) {
- if (value != null) {
- val converted = convert(value) as MutableMap
+ if (map != null) {
+ val converted = convert(map) as MutableMap
setValue(result, fieldName, converted)
} else {
setNullField(result, fieldName)
diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/SinkConfigurationTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/SinkConfigurationTest.kt
index 20515e57d..63e290cd2 100644
--- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/SinkConfigurationTest.kt
+++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/SinkConfigurationTest.kt
@@ -161,6 +161,7 @@ class SinkConfigurationTest {
assertEquals(setOf("foo", "bar"), config.topics.cudTopics)
}
+ @Suppress("DEPRECATION")
@Test
fun `migrateSettings should replace deprecated settings with up-to-date equivalent`() {
val originals =
diff --git a/source/src/main/kotlin/org/neo4j/connectors/kafka/source/DeprecatedNeo4jSourceConfiguration.kt b/source/src/main/kotlin/org/neo4j/connectors/kafka/source/DeprecatedNeo4jSourceConfiguration.kt
index 558a18555..43e85bf29 100644
--- a/source/src/main/kotlin/org/neo4j/connectors/kafka/source/DeprecatedNeo4jSourceConfiguration.kt
+++ b/source/src/main/kotlin/org/neo4j/connectors/kafka/source/DeprecatedNeo4jSourceConfiguration.kt
@@ -17,7 +17,6 @@
package org.neo4j.connectors.kafka.source
import org.apache.kafka.common.config.ConfigDef
-import org.neo4j.connectors.kafka.configuration.ConnectorType
import org.neo4j.connectors.kafka.configuration.DeprecatedNeo4jConfiguration
import org.neo4j.connectors.kafka.configuration.helpers.ConfigKeyBuilder
import org.neo4j.connectors.kafka.configuration.helpers.Recommenders
@@ -25,7 +24,7 @@ import org.neo4j.connectors.kafka.configuration.helpers.Validators
@Deprecated("use org.neo4j.connectors.kafka.source.SourceConfiguration")
class DeprecatedNeo4jSourceConfiguration(originals: Map<*, *>) :
- DeprecatedNeo4jConfiguration(config(), originals, ConnectorType.SOURCE) {
+ DeprecatedNeo4jConfiguration(config(), originals) {
enum class StreamingFrom {
ALL,
@@ -48,6 +47,7 @@ class DeprecatedNeo4jSourceConfiguration(originals: Map<*, *>) :
@Deprecated("deprecated in favour of ${SourceConfiguration.QUERY}")
const val SOURCE_TYPE_QUERY = "neo4j.source.query"
+ @Suppress("DEPRECATION")
fun config(): ConfigDef =
DeprecatedNeo4jConfiguration.config()
.define(
diff --git a/source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt b/source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt
index 35a24f886..1b58c2f8f 100644
--- a/source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt
+++ b/source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt
@@ -378,8 +378,9 @@ class SourceConfiguration(originals: Map<*, *>) :
private val DEFAULT_CDC_POLL_INTERVAL = 1.seconds
private val DEFAULT_CDC_POLL_DURATION = 5.seconds
+ @Suppress("DEPRECATION")
fun migrateSettings(oldSettings: Map): Map {
- val migrated = Neo4jConfiguration.migrateSettings(oldSettings, true).toMutableMap()
+ val migrated = migrateSettings(oldSettings, true).toMutableMap()
oldSettings.forEach {
when (it.key) {
@@ -416,7 +417,7 @@ class SourceConfiguration(originals: Map<*, *>) :
}
fun validate(config: Config, originals: Map) {
- Neo4jConfiguration.validate(config)
+ validate(config)
// START_FROM user defined validation
config.validateNonEmptyIfVisible(START_FROM_VALUE)
diff --git a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/DeprecatedNeo4jSourceConfigurationTest.kt b/source/src/test/kotlin/org/neo4j/connectors/kafka/source/DeprecatedNeo4jSourceConfigurationTest.kt
index 84fba5d90..e58b68fd4 100644
--- a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/DeprecatedNeo4jSourceConfigurationTest.kt
+++ b/source/src/test/kotlin/org/neo4j/connectors/kafka/source/DeprecatedNeo4jSourceConfigurationTest.kt
@@ -23,6 +23,8 @@ import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.configuration.DeprecatedNeo4jConfiguration
class DeprecatedNeo4jSourceConfigurationTest {
+
+ @Suppress("DEPRECATION")
@Test
fun `should not allow cdc as source type`() {
assertFailsWith(ConfigException::class) {
diff --git a/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtensionTest.kt b/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtensionTest.kt
index c095cda0c..3b0e4477f 100644
--- a/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtensionTest.kt
+++ b/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtensionTest.kt
@@ -36,7 +36,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.ParameterizedTest.DISPLAY_NAME_PLACEHOLDER
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.Mockito.any
-import org.mockito.kotlin.any
import org.mockito.kotlin.doAnswer
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.inOrder
@@ -161,6 +160,7 @@ class Neo4jSourceExtensionTest {
@ParameterizedTest(name = "$DISPLAY_NAME_PLACEHOLDER [{0}]")
@MethodSource("validMethods")
+ @Suppress("UNUSED_PARAMETER") // Kotlin compiler not smart enough to see name param is used
fun `resolves Session parameter`(name: String, method: KFunction) {
val session = mock()
val driver =
@@ -181,6 +181,7 @@ class Neo4jSourceExtensionTest {
@ParameterizedTest(name = "$DISPLAY_NAME_PLACEHOLDER [{0}]")
@MethodSource("validMethods")
+ @Suppress("UNUSED_PARAMETER") // Kotlin compiler not smart enough to see name param is used
fun `resolves consumer parameter`(name: String, method: KFunction) {
val consumer = mock>()
val extension = Neo4jSourceExtension(consumerFactory = { _, _ -> consumer })