Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
<packaging>jar</packaging>
<name>common</name>
<description>Neo4j Connector for Kafka - Common</description>
<properties>
<maven-resources-plugin.version>3.3.1</maven-resources-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.neo4j.connectors.kafka.configuration

import com.fasterxml.jackson.databind.util.ClassUtil.defaultValue
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.function.Predicate
Expand All @@ -37,17 +36,12 @@ object ConfigGroup {
const val CONNECTION = "Connection"
const val AUTHENTICATION = "Authentication"
const val TOPIC_CYPHER_MAPPING = "Topic Cypher Mapping"
const val ERROR_REPORTING = "Error Reporting"
const val BATCH = "Batch Management"
const val RETRY = "Retry Strategy"
const val DEPRECATED = "Deprecated Properties (please check the documentation)"
}

open class DeprecatedNeo4jConfiguration(
configDef: ConfigDef,
originals: Map<*, *>,
private val type: ConnectorType
) : AbstractConfig(configDef, originals) {
open class DeprecatedNeo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>) :
AbstractConfig(configDef, originals) {

companion object {
@Deprecated("deprecated in favour of ${Neo4jConfiguration.URI}")
Expand Down Expand Up @@ -99,6 +93,7 @@ open class DeprecatedNeo4jConfiguration(
val CONNECTION_MAX_CONNECTION_LIFETIME_MSECS_DEFAULT = Duration.ofMinutes(8).toMillis()
val CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS_DEFAULT = Duration.ofMinutes(2).toMillis()

@Suppress("DEPRECATION")
fun config(): ConfigDef =
ConfigDef()
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
val migrated = mutableMapOf<String, String>()

oldSettings.forEach {
@Suppress("DEPRECATION")
when (it.key) {
DeprecatedNeo4jConfiguration.SERVER_URI -> migrated[URI] = it.value.toString()
DeprecatedNeo4jConfiguration.CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import org.apache.kafka.common.config.ConfigDef
import org.neo4j.connectors.kafka.utils.PropertiesUtil

object ConfigUtils {
inline fun <reified E : Enum<E>> getEnum(config: AbstractConfig, key: String): E {
return enumValueOf<E>(config.getString(key))
inline fun <reified E : Enum<E>> getEnum(config: AbstractConfig, key: String): E? {
return try {
enumValueOf<E>(config.getString(key))
} catch (e: IllegalArgumentException) {
null
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ abstract class StreamsTransactionEventDeserializer<EVENT, PAYLOAD : Payload> :
private fun convertPoints(recordChange: RecordChange?, points: Set<String>) =
recordChange?.properties?.mapValues {
if (points.contains(it.key)) {
val pointMap = it.value as Map<String, Any>
val pointMap = it.value as Map<*, *>
when (pointMap["crs"]) {
"cartesian" ->
Values.point(
Expand Down Expand Up @@ -301,14 +301,6 @@ object JSONUtils {
return OBJECT_MAPPER.writeValueAsString(any)
}

fun writeValueAsBytes(any: Any): ByteArray {
return OBJECT_MAPPER.writeValueAsBytes(any)
}

inline fun <reified T> readValue(value: ByteArray): T {
return getObjectMapper().readValue(value, T::class.java)
}

inline fun <reified T> readValue(
value: Any,
stringWhenFailure: Boolean = false,
Expand All @@ -325,7 +317,6 @@ object JSONUtils {
val strValue =
when (value) {
is ByteArray -> String(value)
null -> ""
else -> value.toString()
}
strValue.trimStart().let { if (it[0] == '{' || it[0] == '[') throw e else it as T }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("DEPRECATION")

package streams.kafka.connect.sink

import org.apache.kafka.common.config.ConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("DEPRECATION")

package streams.kafka.connect.source

import org.apache.kafka.common.config.ConfigDef
Expand Down
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.3.1</version>
<configuration>
<propertiesEncoding>${project.build.sourceEncoding}</propertiesEncoding>
</configuration>
</plugin>
<plugin>
<groupId>org.jreleaser</groupId>
<artifactId>jreleaser-maven-plugin</artifactId>
Expand Down Expand Up @@ -295,6 +303,7 @@
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<configuration>
<args>-Werror</args>
<jvmTarget>${java.version}</jvmTarget>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package org.neo4j.connectors.kafka.sink

import org.apache.kafka.common.config.ConfigDef
import org.neo4j.connectors.kafka.configuration.ConfigGroup
import org.neo4j.connectors.kafka.configuration.ConnectorType
import org.neo4j.connectors.kafka.configuration.DeprecatedNeo4jConfiguration
import org.neo4j.connectors.kafka.configuration.helpers.ConfigKeyBuilder
import org.neo4j.connectors.kafka.service.sink.strategy.SourceIdIngestionStrategyConfig
import org.neo4j.connectors.kafka.utils.PropertiesUtil

@Deprecated("use org.neo4j.connectors.kafka.sink.SinkConfiguration")
class DeprecatedNeo4jSinkConfiguration(originals: Map<*, *>) :
DeprecatedNeo4jConfiguration(config(), originals, ConnectorType.SINK) {
DeprecatedNeo4jConfiguration(config(), originals) {

companion object {

Expand Down Expand Up @@ -57,10 +56,11 @@ class DeprecatedNeo4jSinkConfiguration(originals: Map<*, *>) :
@Deprecated("deprecated in favour of ${SinkConfiguration.CUD_TOPICS}")
const val TOPIC_CUD = "neo4j.topic.cud"

const val DEFAULT_BATCH_PARALLELIZE = true
const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED = false
const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED = false
private const val DEFAULT_BATCH_PARALLELIZE = true
private const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED = false
private const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED = false

@Suppress("DEPRECATION")
fun config(): ConfigDef =
DeprecatedNeo4jConfiguration.config()
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.neo4j.connectors.kafka.sink

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask
import org.neo4j.connectors.kafka.configuration.helpers.VersionUtil
Expand Down Expand Up @@ -47,6 +49,7 @@ class Neo4jSinkTask : SinkTask() {
log::error)
}

@OptIn(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class)
override fun put(collection: Collection<SinkRecord>) {
if (collection.isEmpty()) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ class SinkConfiguration(originals: Map<*, *>) :
const val PATTERN_RELATIONSHIP_MERGE_PROPERTIES = "neo4j.pattern.relationship.merge-properties"
const val CUD_TOPICS = "neo4j.cud.topics"

const val DEFAULT_BATCH_SIZE = 1000
private const val DEFAULT_BATCH_SIZE = 1000
val DEFAULT_BATCH_TIMEOUT = 0.seconds
const val DEFAULT_BATCH_PARALLELIZE = true
const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES = false
const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES = false
private const val DEFAULT_BATCH_PARALLELIZE = true
private const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES = false
private const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES = false

@Suppress("DEPRECATION")
@JvmStatic
val KEY_REPLACEMENTS =
mapOf(
Expand All @@ -108,8 +109,9 @@ class SinkConfiguration(originals: Map<*, *>) :
DeprecatedNeo4jSinkConfiguration.TOPIC_PATTERN_RELATIONSHIP_PREFIX to
PATTERN_RELATIONSHIP_TOPIC_PREFIX)

@Suppress("DEPRECATION")
fun migrateSettings(oldSettings: Map<String, Any>): Map<String, String> {
val migratedBase = Neo4jConfiguration.migrateSettings(oldSettings, false)
val migratedBase = migrateSettings(oldSettings, false)
val migrated = HashMap<String, String>(migratedBase.size)

migratedBase.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ open class MapValueConverter<T> : ValueConverter<MutableMap<String, T?>>() {
result: MutableMap<String, T?>?,
fieldName: String,
schema: Schema?,
value: MutableMap<Any?, Any?>?
map: MutableMap<Any?, Any?>?
) {
if (value != null) {
val converted = convert(value) as MutableMap<Any?, Any?>
if (map != null) {
val converted = convert(map) as MutableMap<Any?, Any?>
setValue(result, fieldName, converted)
} else {
setNullField(result, fieldName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class SinkConfigurationTest {
assertEquals(setOf("foo", "bar"), config.topics.cudTopics)
}

@Suppress("DEPRECATION")
@Test
fun `migrateSettings should replace deprecated settings with up-to-date equivalent`() {
val originals =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package org.neo4j.connectors.kafka.source

import org.apache.kafka.common.config.ConfigDef
import org.neo4j.connectors.kafka.configuration.ConnectorType
import org.neo4j.connectors.kafka.configuration.DeprecatedNeo4jConfiguration
import org.neo4j.connectors.kafka.configuration.helpers.ConfigKeyBuilder
import org.neo4j.connectors.kafka.configuration.helpers.Recommenders
import org.neo4j.connectors.kafka.configuration.helpers.Validators

@Deprecated("use org.neo4j.connectors.kafka.source.SourceConfiguration")
class DeprecatedNeo4jSourceConfiguration(originals: Map<*, *>) :
DeprecatedNeo4jConfiguration(config(), originals, ConnectorType.SOURCE) {
DeprecatedNeo4jConfiguration(config(), originals) {

enum class StreamingFrom {
ALL,
Expand All @@ -48,6 +47,7 @@ class DeprecatedNeo4jSourceConfiguration(originals: Map<*, *>) :
@Deprecated("deprecated in favour of ${SourceConfiguration.QUERY}")
const val SOURCE_TYPE_QUERY = "neo4j.source.query"

@Suppress("DEPRECATION")
fun config(): ConfigDef =
DeprecatedNeo4jConfiguration.config()
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,9 @@ class SourceConfiguration(originals: Map<*, *>) :
private val DEFAULT_CDC_POLL_INTERVAL = 1.seconds
private val DEFAULT_CDC_POLL_DURATION = 5.seconds

@Suppress("DEPRECATION")
fun migrateSettings(oldSettings: Map<String, Any>): Map<String, String> {
val migrated = Neo4jConfiguration.migrateSettings(oldSettings, true).toMutableMap()
val migrated = migrateSettings(oldSettings, true).toMutableMap()

oldSettings.forEach {
when (it.key) {
Expand Down Expand Up @@ -416,7 +417,7 @@ class SourceConfiguration(originals: Map<*, *>) :
}

fun validate(config: Config, originals: Map<String, String>) {
Neo4jConfiguration.validate(config)
validate(config)

// START_FROM user defined validation
config.validateNonEmptyIfVisible(START_FROM_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.configuration.DeprecatedNeo4jConfiguration

class DeprecatedNeo4jSourceConfigurationTest {

@Suppress("DEPRECATION")
@Test
fun `should not allow cdc as source type`() {
assertFailsWith(ConfigException::class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.ParameterizedTest.DISPLAY_NAME_PLACEHOLDER
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.Mockito.any
import org.mockito.kotlin.any
import org.mockito.kotlin.doAnswer
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.inOrder
Expand Down Expand Up @@ -161,6 +160,7 @@ class Neo4jSourceExtensionTest {

@ParameterizedTest(name = "$DISPLAY_NAME_PLACEHOLDER [{0}]")
@MethodSource("validMethods")
@Suppress("UNUSED_PARAMETER") // Kotlin compiler not smart enough to see name param is used
fun `resolves Session parameter`(name: String, method: KFunction<Unit>) {
val session = mock<Session>()
val driver =
Expand All @@ -181,6 +181,7 @@ class Neo4jSourceExtensionTest {

@ParameterizedTest(name = "$DISPLAY_NAME_PLACEHOLDER [{0}]")
@MethodSource("validMethods")
@Suppress("UNUSED_PARAMETER") // Kotlin compiler not smart enough to see name param is used
fun `resolves consumer parameter`(name: String, method: KFunction<Unit>) {
val consumer = mock<KafkaConsumer<String, GenericRecord>>()
val extension = Neo4jSourceExtension(consumerFactory = { _, _ -> consumer })
Expand Down