Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito.kotlin</groupId>
<artifactId>mockito-kotlin</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.kafka.connect.errors.ConnectException
import org.neo4j.connectors.kafka.configuration.helpers.ConfigUtils
import org.neo4j.connectors.kafka.configuration.helpers.Validators.validateNonEmptyIfVisible
import org.neo4j.connectors.kafka.configuration.helpers.parseSimpleString
import org.neo4j.connectors.kafka.utils.Telemetry.connectorInformation
import org.neo4j.connectors.kafka.utils.Telemetry.userAgent
import org.neo4j.driver.AccessMode
import org.neo4j.driver.AuthToken
import org.neo4j.driver.AuthTokens
Expand All @@ -44,9 +46,9 @@ import org.neo4j.driver.net.ServerAddress
import org.slf4j.Logger
import org.slf4j.LoggerFactory

enum class ConnectorType {
SINK,
SOURCE
enum class ConnectorType(val description: String) {
SINK("sink"),
SOURCE("source"),
}

enum class AuthenticationType {
Expand All @@ -61,6 +63,10 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
AbstractConfig(configDef, originals), Closeable {
private val logger: Logger = LoggerFactory.getLogger(Neo4jConfiguration::class.java)

private val deprecated: Boolean by lazy {
originalsStrings().getOrElse(DEPRECATED) { "false" }.toBoolean()
}

val database
get(): String = getString(DATABASE)

Expand Down Expand Up @@ -159,6 +165,7 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
}
}

config.withUserAgent(userAgent(type.description, deprecated, userAgentComment()))
config.withConnectionAcquisitionTimeout(
connectionAcquisitionTimeout.inWholeMilliseconds, TimeUnit.MILLISECONDS)
config.withConnectionTimeout(connectionTimeout.inWholeMilliseconds, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -201,9 +208,26 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
return driver.session(config.build())
}

open fun txConfig(): TransactionConfig = TransactionConfig.empty()
open fun txConfig(): TransactionConfig =
TransactionConfig.builder()
.withMetadata(
buildMap {
this["app"] = connectorInformation(type.description, deprecated)

val metadata = telemetryData()
if (metadata.isNotEmpty()) {
this["metadata"] = metadata
}
})
.build()

open fun telemetryData(): Map<String, Any> = emptyMap()

open fun userAgentComment(): String = ""

companion object {
const val DEPRECATED = "neo4j.deprecated"

const val DEFAULT_MAX_RETRY_ATTEMPTS = 5
val DEFAULT_MAX_RETRY_DURATION = 30.seconds

Expand Down
107 changes: 107 additions & 0 deletions common/src/main/kotlin/org/neo4j/connectors/kafka/utils/Telemetry.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.utils

import org.apache.kafka.common.utils.AppInfoParser
import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration
import org.neo4j.connectors.kafka.configuration.helpers.VersionUtil
import org.neo4j.driver.Config

interface EnvironmentProvider {
fun get(env: String): String?
}

object Telemetry {
internal const val CONFLUENT_ENV: String = "CONFLUENT_ENV"
internal const val CONFLUENT_ENV_VALUE: String = "CCLOUD_CUSTOM_CONNECTOR"

private object SystemEnvironmentProvider : EnvironmentProvider {
override fun get(env: String): String? {
return System.getenv(env)
}
}

fun userAgent(
type: String,
legacy: Boolean = false,
comment: String = "",
provider: EnvironmentProvider = SystemEnvironmentProvider
): String {
return String.format(
"%s %s (%s) %s %s",
connectorInformation(
type, legacy, VersionUtil.version(Neo4jConfiguration::class.java), comment, provider),
kafkaConnectInformation(),
platform(),
neo4jDriverVersion(),
jreInformation(),
)
}

private fun runningInConfluentCloud(
provider: EnvironmentProvider = SystemEnvironmentProvider
): Boolean {
val value = provider.get(CONFLUENT_ENV)
if (value.isNullOrEmpty()) {
return false
}
return value.contentEquals(CONFLUENT_ENV_VALUE, true)
}

internal fun connectorInformation(
type: String,
legacy: Boolean,
version: String = "",
comment: String = "",
provider: EnvironmentProvider = SystemEnvironmentProvider
): String {
return String.format(
"%s-%s%s%s",
if (runningInConfluentCloud(provider)) "confluent-cloud" else "kafka",
if (legacy) "legacy-$type" else type,
if (version.isEmpty()) "" else "/$version",
if (comment.isEmpty()) "" else " ($comment)")
}

internal fun kafkaConnectInformation(): String {
return String.format("kafka-connect/%s", AppInfoParser.getVersion())
}

internal fun platform(): String {
return String.format(
"%s; %s; %s",
System.getProperty("os.name"),
System.getProperty("os.version"),
System.getProperty("os.arch"),
)
}

internal fun neo4jDriverVersion(): String {
return Config.defaultConfig().userAgent()
}

internal fun jreInformation(): String {
// this format loosely follows the Java driver's Bolt Agent format
return String.format(
"Java/%s (%s; %s; %s)",
System.getProperty("java.version"),
System.getProperty("java.vm.vendor"),
System.getProperty("java.vm.name"),
System.getProperty("java.vm.version"),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.utils

import io.kotest.matchers.shouldBe
import io.kotest.matchers.string.shouldStartWith
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.condition.EnabledOnJre
import org.junit.jupiter.api.condition.EnabledOnOs
import org.junit.jupiter.api.condition.JRE
import org.junit.jupiter.api.condition.OS
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import org.neo4j.connectors.kafka.utils.Telemetry.CONFLUENT_ENV
import org.neo4j.connectors.kafka.utils.Telemetry.CONFLUENT_ENV_VALUE
import org.neo4j.connectors.kafka.utils.Telemetry.connectorInformation
import org.neo4j.connectors.kafka.utils.Telemetry.jreInformation
import org.neo4j.connectors.kafka.utils.Telemetry.neo4jDriverVersion
import org.neo4j.connectors.kafka.utils.Telemetry.platform

class TelemetryTest {

@Test
fun `should return driver version`() {
neo4jDriverVersion() shouldStartWith "neo4j-java"
}

@Test
@EnabledOnJre(JRE.JAVA_11)
fun `should return jre information on jre 11`() {
jreInformation() shouldStartWith "Java/11"
}

@Test
@EnabledOnJre(JRE.JAVA_17)
fun `should return jre information on jre 17`() {
jreInformation() shouldStartWith "Java/17"
}

@Test
@EnabledOnJre(JRE.JAVA_21)
fun `should return jre information on jre 21`() {
jreInformation() shouldStartWith "Java/21"
}

@Test
@EnabledOnOs(OS.MAC)
fun `should return platform`() {
platform() shouldStartWith "Mac OS X"
}

@Test
fun `should return connector information with kafka`() {
val provider = mock<EnvironmentProvider> { on { get(CONFLUENT_ENV) } doReturn null }

connectorInformation("sink", false, "", "", provider) shouldBe "kafka-sink"
connectorInformation("sink", false, "5.1.0", "", provider) shouldBe "kafka-sink/5.1.0"
connectorInformation("source", false, "", "", provider) shouldBe "kafka-source"
connectorInformation("source", false, "5.1.0", "", provider) shouldBe "kafka-source/5.1.0"
connectorInformation("sink", true, "", "", provider) shouldBe "kafka-legacy-sink"
connectorInformation("source", true, "", "", provider) shouldBe "kafka-legacy-source"
connectorInformation("sink", false, "", "cypher; node-pattern", provider) shouldBe
"kafka-sink (cypher; node-pattern)"
connectorInformation("sink", false, "5.1.0", "cypher; node-pattern", provider) shouldBe
"kafka-sink/5.1.0 (cypher; node-pattern)"
connectorInformation("source", false, "", "cdc", provider) shouldBe "kafka-source (cdc)"
connectorInformation("source", false, "5.1.0", "cdc", provider) shouldBe
"kafka-source/5.1.0 (cdc)"
connectorInformation("sink", true, "", "cypher", provider) shouldBe "kafka-legacy-sink (cypher)"
connectorInformation("source", true, "", "query", provider) shouldBe
"kafka-legacy-source (query)"
}

@Test
fun `should return connector information with confluent-cloud`() {
val provider =
mock<EnvironmentProvider> { on { get(CONFLUENT_ENV) } doReturn CONFLUENT_ENV_VALUE }

connectorInformation("sink", false, "", "", provider) shouldBe "confluent-cloud-sink"
connectorInformation("sink", false, "5.1.0", "", provider) shouldBe "confluent-cloud-sink/5.1.0"
connectorInformation("source", false, "", "", provider) shouldBe "confluent-cloud-source"
connectorInformation("source", false, "5.1.0", "", provider) shouldBe
"confluent-cloud-source/5.1.0"
connectorInformation("sink", true, "", "", provider) shouldBe "confluent-cloud-legacy-sink"
connectorInformation("source", true, "", "", provider) shouldBe "confluent-cloud-legacy-source"
connectorInformation("sink", false, "", "cypher; node-pattern", provider) shouldBe
"confluent-cloud-sink (cypher; node-pattern)"
connectorInformation("sink", false, "5.1.0", "cypher; node-pattern", provider) shouldBe
"confluent-cloud-sink/5.1.0 (cypher; node-pattern)"
connectorInformation("source", false, "", "cdc", provider) shouldBe
"confluent-cloud-source (cdc)"
connectorInformation("source", false, "5.1.0", "cdc", provider) shouldBe
"confluent-cloud-source/5.1.0 (cdc)"
connectorInformation("sink", true, "", "cypher", provider) shouldBe
"confluent-cloud-legacy-sink (cypher)"
connectorInformation("source", true, "", "query", provider) shouldBe
"confluent-cloud-legacy-source (query)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package streams.kafka.connect.sink
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.sink.SinkConnector
import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration
import org.neo4j.connectors.kafka.sink.DeprecatedNeo4jSinkConfiguration
import org.neo4j.connectors.kafka.sink.DeprecatedNeo4jSinkTask
import org.neo4j.connectors.kafka.sink.SinkConfiguration
Expand All @@ -36,7 +37,10 @@ class Neo4jSinkConnector : SinkConnector() {

override fun start(props: MutableMap<String, String>?) {
settings = props!!
config = SinkConfiguration(SinkConfiguration.migrateSettings(settings))
config =
SinkConfiguration(
SinkConfiguration.migrateSettings(settings) +
mapOf(Neo4jConfiguration.DEPRECATED to "true"))
}

override fun stop() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package streams.kafka.connect.source
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.source.SourceConnector
import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration
import org.neo4j.connectors.kafka.source.DeprecatedNeo4jSourceConfiguration
import org.neo4j.connectors.kafka.source.Neo4jQueryTask
import org.neo4j.connectors.kafka.source.SourceConfiguration
Expand All @@ -40,7 +41,10 @@ class Neo4jSourceConnector : SourceConnector() {

override fun start(props: MutableMap<String, String>?) {
settings = props!!
config = SourceConfiguration(SourceConfiguration.migrateSettings(settings))
config =
SourceConfiguration(
SourceConfiguration.migrateSettings(settings) +
mapOf(Neo4jConfiguration.DEPRECATED to "true"))
}

override fun stop() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.neo4j.cypherdsl.core.renderer.Configuration
import org.neo4j.cypherdsl.core.renderer.Dialect
import org.neo4j.cypherdsl.core.renderer.Renderer

class SinkConfiguration(originals: Map<*, *>) :
class SinkConfiguration(originals: Map<String, *>) :
Neo4jConfiguration(config(), originals, ConnectorType.SINK) {

val parallelBatches
Expand Down Expand Up @@ -98,6 +98,9 @@ class SinkConfiguration(originals: Map<*, *>) :
validateAllTopics(originals)
}

override fun userAgentComment(): String =
SinkStrategyHandler.configuredStrategies(this).sorted().joinToString("; ")

private fun validateAllTopics(originals: Map<*, *>) {
TopicUtils.validate<ConfigException>(this.topics)
val topics =
Expand Down
Loading