diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfiguration.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfiguration.kt index 4606b6b59..62ec1ecc7 100644 --- a/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfiguration.kt +++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfiguration.kt @@ -20,6 +20,7 @@ import java.io.Closeable import java.io.File import java.net.URI import java.util.concurrent.TimeUnit +import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds import org.apache.kafka.common.config.AbstractConfig @@ -70,33 +71,29 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty get(): List = getList(URI).map { URI(it) } internal val connectionTimeout - get(): kotlin.time.Duration = - kotlin.time.Duration.parseSimpleString(getString(CONNECTION_TIMEOUT)) + get(): Duration = Duration.parseSimpleString(getString(CONNECTION_TIMEOUT)) internal val maxRetryTime - get(): kotlin.time.Duration = - kotlin.time.Duration.parseSimpleString(getString(MAX_TRANSACTION_RETRY_TIMEOUT)) + get(): Duration = Duration.parseSimpleString(getString(MAX_TRANSACTION_RETRY_TIMEOUT)) internal val maxConnectionPoolSize get(): Int = getInt(POOL_MAX_CONNECTION_POOL_SIZE) internal val connectionAcquisitionTimeout - get(): kotlin.time.Duration = - kotlin.time.Duration.parseSimpleString(getString(POOL_CONNECTION_ACQUISITION_TIMEOUT)) + get(): Duration = Duration.parseSimpleString(getString(POOL_CONNECTION_ACQUISITION_TIMEOUT)) internal val idleTimeBeforeTest - get(): kotlin.time.Duration = + get(): Duration = getString(POOL_IDLE_TIME_BEFORE_TEST).orEmpty().run { if (this.isEmpty()) { (-1).milliseconds } else { - kotlin.time.Duration.parseSimpleString(this) + Duration.parseSimpleString(this) } } internal val maxConnectionLifetime - get(): kotlin.time.Duration = - kotlin.time.Duration.parseSimpleString(getString(POOL_MAX_CONNECTION_LIFETIME)) + get(): Duration = Duration.parseSimpleString(getString(POOL_MAX_CONNECTION_LIFETIME)) internal val encrypted get(): Boolean = getString(SECURITY_ENCRYPTED).toBoolean() @@ -239,6 +236,12 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty } } + val connectorName + get(): String = originals()[CONNECTOR_NAME].toString() + + val taskId + get(): String = originals()[TASK_ID].toString() + companion object { val DEFAULT_MAX_RETRY_DURATION = 30.seconds @@ -271,6 +274,10 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty const val SECURITY_TRUST_STRATEGY = "neo4j.security.trust-strategy" const val SECURITY_CERT_FILES = "neo4j.security.cert-files" + // internal properties + const val CONNECTOR_NAME = "name" + const val TASK_ID = "neo4j.task.id" + /** Perform validation on dependent configuration items */ fun validate(config: org.apache.kafka.common.config.Config) { // authentication configuration diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/CdcMetricsData.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/CdcMetricsData.kt new file mode 100644 index 000000000..78e9707b4 --- /dev/null +++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/CdcMetricsData.kt @@ -0,0 +1,72 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.metrics + +import java.util.concurrent.atomic.AtomicLong +import org.neo4j.cdc.client.model.ChangeEvent +import org.neo4j.connectors.kafka.configuration.ConnectorType + +class CdcMetricsData( + metrics: Metrics, + connectorType: ConnectorType, + tags: LinkedHashMap = linkedMapOf(), +) { + + private var lastTxCommitTs: AtomicLong = AtomicLong(0L) + private var lastTxStartTs: AtomicLong = AtomicLong(0L) + private var lastTxId: AtomicLong = AtomicLong(0L) + + init { + metrics.addGauge( + "last_cdc_tx_commit_timestamp", + "The transaction commit timestamp of the last ${connectorType.descriptionActionVerb()} CDC message", + tags, + ) { + lastTxCommitTs.get() + } + metrics.addGauge( + "last_cdc_tx_start_timestamp", + "The transaction start timestamp of the last ${connectorType.descriptionActionVerb()} CDC message", + tags, + ) { + lastTxStartTs.get() + } + metrics.addGauge( + "last_cdc_tx_id", + "The transaction id of the last ${connectorType.descriptionActionVerb()} CDC message", + tags, + ) { + lastTxId.get() + } + } + + fun update(event: ChangeEvent) { + event.metadata?.let { + lastTxCommitTs.set(it.txCommitTime.toEpochSecond()) + lastTxStartTs.set(it.txStartTime.toEpochSecond()) + } + lastTxId.set(event.txId) + } + + companion object { + private fun ConnectorType.descriptionActionVerb(): String = + when (this) { + ConnectorType.SOURCE -> "polled" + ConnectorType.SINK -> "pushed" + } + } +} diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/DbTransactionMetricsData.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/DbTransactionMetricsData.kt new file mode 100644 index 000000000..9ebefa12c --- /dev/null +++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/DbTransactionMetricsData.kt @@ -0,0 +1,89 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.metrics + +import java.io.Closeable +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.Duration +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.neo4j.driver.Driver +import org.neo4j.driver.TransactionConfig +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class DbTransactionMetricsData( + metrics: Metrics, + tags: LinkedHashMap = linkedMapOf(), + refreshInterval: Duration, + neo4jDriver: Driver, + transactionConfig: TransactionConfig, + databaseName: String, + dispatcher: CoroutineDispatcher = Dispatchers.Default, +) : Closeable { + + private val lastTransactionId = AtomicLong(0) + private val scope = CoroutineScope(dispatcher + Job()) + + init { + metrics.addGauge("last_db_tx_id", "The last committed transaction id in the database", tags) { + lastTransactionId.get() + } + + scope.launch { + while (isActive) { + try { + val explicitDatabaseName = databaseName.ifBlank { "neo4j" } + val txId: Long = + neo4jDriver.session().use { session -> + session.writeTransaction( + { tx -> + tx.run( + "SHOW DATABASE ${"$"}dbName YIELD lastCommittedTxn RETURN lastCommittedTxn as txId", + mapOf("dbName" to explicitDatabaseName), + ) + .single() + .get("txId") + .asLong() + }, + transactionConfig, + ) + } + lastTransactionId.set(txId) + } catch (e: Throwable) { + log.warn("Unexpected error occurred while fetching last committed transaction id", e) + } + + delay(refreshInterval) + } + } + } + + override fun close() { + scope.cancel() + } + + companion object { + private val log: Logger = LoggerFactory.getLogger(DbTransactionMetricsData::class.java) + } +} diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/JmxMetrics.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/JmxMetrics.kt new file mode 100644 index 000000000..73e811fb4 --- /dev/null +++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/JmxMetrics.kt @@ -0,0 +1,118 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.metrics + +import java.lang.management.ManagementFactory +import java.util.Hashtable +import java.util.concurrent.ConcurrentHashMap +import javax.management.Attribute +import javax.management.AttributeList +import javax.management.DynamicMBean +import javax.management.MBeanAttributeInfo +import javax.management.MBeanInfo +import javax.management.ObjectName +import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration + +class JmxMetrics(config: Neo4jConfiguration) : Metrics, DynamicMBean { + + private val connectorName: String = config.connectorName + private val taskId: String = config.taskId + private val objectName: ObjectName = + ObjectName( + "kafka.connect", + Hashtable().apply { + put("type", "plugins") + put("connector", connectorName) + put("task", taskId) + }, + ) + + private val gauges = ConcurrentHashMap>() + private val mbs = ManagementFactory.getPlatformMBeanServer() + + init { + if (mbs.isRegistered(objectName)) { + mbs.unregisterMBean(objectName) + } + mbs.registerMBean(this, objectName) + } + + override fun addGauge( + name: String, + description: String, + tags: LinkedHashMap, + valueProvider: () -> T?, + ) { + gauges[name] = Gauge(name, description, valueProvider) + } + + override fun getAttribute(attribute: String?): Any? { + return gauges[attribute]?.valueProvider?.invoke() + } + + override fun setAttribute(attribute: Attribute?) { + throw UnsupportedOperationException("Attributes are read-only") + } + + override fun getAttributes(attributes: Array?): AttributeList { + val list = AttributeList() + attributes?.forEach { name -> + getAttribute(name)?.let { value -> list.add(Attribute(name, value)) } + } + return list + } + + override fun setAttributes(attributes: AttributeList?): AttributeList { + throw UnsupportedOperationException("Attributes are read-only") + } + + override fun invoke( + actionName: String?, + params: Array?, + signature: Array?, + ): Any { + throw UnsupportedOperationException("Operations are not supported") + } + + override fun getMBeanInfo(): MBeanInfo { + val attrs = + gauges.values.map { gauge -> + MBeanAttributeInfo(gauge.name, "java.lang.Number", gauge.description, true, false, false) + } + + return MBeanInfo( + this.javaClass.name, + "Neo4j Kafka Connector JMX Metrics", + attrs.toTypedArray(), + null, + null, + null, + ) + } + + private class Gauge( + val name: String, + val description: String, + val valueProvider: () -> T?, + ) + + override fun close() { + if (mbs.isRegistered(objectName)) { + mbs.unregisterMBean(objectName) + } + } +} diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/MetricsFactory.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/MetricsFactory.kt new file mode 100644 index 000000000..3278a2cba --- /dev/null +++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/MetricsFactory.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.metrics + +import java.io.Closeable +import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration + +class MetricsFactory { + + fun createMetrics(config: Neo4jConfiguration): Metrics { + return JmxMetrics(config) + } +} + +interface Metrics : Closeable { + fun addGauge( + name: String, + description: String, + tags: LinkedHashMap, + valueProvider: () -> T?, + ) +} diff --git a/common/src/main/resources/neo4j-source-configuration.properties b/common/src/main/resources/neo4j-source-configuration.properties index ab6bc8572..83d5fa39f 100644 --- a/common/src/main/resources/neo4j-source-configuration.properties +++ b/common/src/main/resources/neo4j-source-configuration.properties @@ -31,3 +31,6 @@ neo4j.cdc.use-leader=Type: Boolean;\nDescription: Whether to use leader for chan neo4j.cdc.poll-duration=Type: Duration;\nDescription: Maximum amount of time Kafka Connect poll request will wait for a change to be received from the database (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`). neo4j.cdc.poll-interval=Type: Duration;\nDescription: The interval in which the database will be polled for changes (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`). neo4j.payload-mode=Type: Enum;\nDescription: Defines the structure of change messages generated. `COMPACT` produces messages which are simpler but with potential schema compatibility and type safety issues, while `EXTENDED` produces messages with extra type information which removes the limitations of `COMPACT` mode. For example, a property type change will lead to schema compatibility failures in `COMPACT` mode. `RAW_JSON_STRING` mode generates messages which are serialized as raw JSON strings. Default is `EXTENDED`. +neo4j.cdc.metric.last-db-tx-id.enabled=Type: Boolean;\nDescription: Whether to enable the `last_db_tx_id` metric. +neo4j.cdc.metric.last-db-tx-id.refresh-interval=Type: Duration;\nDescription: The refresh interval of the `last_db_tx_id` metric (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`). The default value is 30 seconds. + diff --git a/common/src/test/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfigurationTest.kt b/common/src/test/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfigurationTest.kt index f1cdc8ed4..7b5b2cfa7 100644 --- a/common/src/test/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfigurationTest.kt +++ b/common/src/test/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfigurationTest.kt @@ -361,6 +361,23 @@ class Neo4jConfigurationTest { } } + @Test + fun `internal variables`() { + Neo4jConfiguration( + Neo4jConfiguration.config(), + mapOf( + Neo4jConfiguration.URI to "bolt://localhost", + Neo4jConfiguration.TASK_ID to "1", + Neo4jConfiguration.CONNECTOR_NAME to "neo4j-connector", + ), + ConnectorType.SINK, + ) + .run { + assertEquals("1", this.taskId) + assertEquals("neo4j-connector", this.connectorName) + } + } + companion object { fun newTempFile(prefix: String = "test", suffix: String = ".tmp"): File { val f = File.createTempFile(prefix, suffix) diff --git a/common/src/test/kotlin/org/neo4j/connectors/kafka/data/converter/CompactValueConverterTest.kt b/common/src/test/kotlin/org/neo4j/connectors/kafka/data/converter/CompactValueConverterTest.kt index dd5742fd7..45181bcf1 100644 --- a/common/src/test/kotlin/org/neo4j/connectors/kafka/data/converter/CompactValueConverterTest.kt +++ b/common/src/test/kotlin/org/neo4j/connectors/kafka/data/converter/CompactValueConverterTest.kt @@ -608,8 +608,6 @@ class DynamicTypesCompactTest { val schema = converter.schema(map, false) val converted = converter.value(schema, map) - println(schema) - println(converted) converted shouldBe Struct(schema) .put("name", "john") diff --git a/common/src/test/kotlin/org/neo4j/connectors/kafka/metrics/DbTransactionMetricsDataTest.kt b/common/src/test/kotlin/org/neo4j/connectors/kafka/metrics/DbTransactionMetricsDataTest.kt new file mode 100644 index 000000000..7b6492e45 --- /dev/null +++ b/common/src/test/kotlin/org/neo4j/connectors/kafka/metrics/DbTransactionMetricsDataTest.kt @@ -0,0 +1,167 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.metrics + +import java.util.UUID +import kotlin.test.assertEquals +import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.milliseconds +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runCurrent +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertNotNull +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.eq +import org.mockito.kotlin.mock +import org.mockito.kotlin.verify +import org.neo4j.connectors.kafka.data.TypesTest.Companion.neo4jImage +import org.neo4j.driver.AuthTokens +import org.neo4j.driver.Driver +import org.neo4j.driver.GraphDatabase +import org.neo4j.driver.TransactionConfig +import org.testcontainers.containers.Neo4jContainer +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.junit.jupiter.Testcontainers + +@OptIn(ExperimentalCoroutinesApi::class) +@Testcontainers +class DbTransactionMetricsDataTest { + + @Test + fun `should register gauge and poll for transaction id`() = runTest { + val metrics = mock() + + val refreshInterval = 100.milliseconds + val transactionConfig = TransactionConfig.builder().build() + + val driver = GraphDatabase.driver(neo4j.boltUrl, AuthTokens.none()) + val dispatcher = this.coroutineContext[kotlinx.coroutines.CoroutineDispatcher]!! + + DbTransactionMetricsData( + metrics = metrics, + refreshInterval = refreshInterval, + neo4jDriver = driver, + databaseName = "", + transactionConfig = transactionConfig, + dispatcher = dispatcher, + ) + .use { + val gaugeCaptor = argumentCaptor<() -> Long?>() + verify(metrics) + .addGauge( + eq("last_db_tx_id"), + eq("The last committed transaction id in the database"), + any(), + gaugeCaptor.capture(), + ) + + val initialValue = gaugeCaptor.firstValue() + assertNotNull(initialValue, "captured transaction id should not be null") + + commitTransaction(driver) + runCurrent() + + val firstIncrement = gaugeCaptor.firstValue() + assertNotNull(firstIncrement, "the first increment of transaction id should not be null") + assertTrue("transaction id should increment") { initialValue < firstIncrement } + + commitTransaction(driver) + advanceTimeBy(refreshInterval) + runCurrent() + + val secondIncrement = gaugeCaptor.firstValue() + assertNotNull( + secondIncrement, + "the second increment of transaction id should not be null", + ) + assertEquals(firstIncrement + 1, secondIncrement, "transaction id should increment") + } + } + + @Test + fun `should stop polling when stop is called`() = runTest { + val metrics = mock() + + val refreshInterval = 100.milliseconds + val transactionConfig = TransactionConfig.builder().build() + + val driver = GraphDatabase.driver(neo4j.boltUrl, AuthTokens.none()) + val dispatcher = this.coroutineContext[kotlinx.coroutines.CoroutineDispatcher]!! + + DbTransactionMetricsData( + metrics = metrics, + refreshInterval = refreshInterval, + neo4jDriver = driver, + databaseName = "", + transactionConfig = transactionConfig, + dispatcher = dispatcher, + ) + .use { data -> + val gaugeCaptor = argumentCaptor<() -> Long?>() + verify(metrics) + .addGauge( + eq("last_db_tx_id"), + eq("The last committed transaction id in the database"), + any(), + gaugeCaptor.capture(), + ) + + val initialValue = gaugeCaptor.firstValue() + assertNotNull(initialValue, "captured transaction id should not be null") + + commitTransaction(driver) + runCurrent() + + val firstIncrement = gaugeCaptor.firstValue() + assertNotNull(firstIncrement, "the first increment of transaction id should not be null") + assertTrue("transaction id should increment") { initialValue < firstIncrement } + + data.close() + commitTransaction(driver) + advanceTimeBy(refreshInterval) + runCurrent() + + val secondIncrement = gaugeCaptor.firstValue() + assertNotNull( + secondIncrement, + "the second increment of transaction id should not be null", + ) + assertEquals(firstIncrement, secondIncrement, "transaction id should not increment") + } + } + + companion object { + + @Container + val neo4j: Neo4jContainer<*> = + Neo4jContainer(neo4jImage()) + .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes") + .withExposedPorts(7687) + .withoutAuthentication() + + private fun commitTransaction(driver: Driver) { + driver.session().use { session -> + session + .run("CREATE (n: TestNode {id: ${'$'}id})", mapOf("id" to UUID.randomUUID().toString())) + .consume() + } + } + } +} diff --git a/common/src/test/kotlin/org/neo4j/connectors/kafka/metrics/JmxMetricsTest.kt b/common/src/test/kotlin/org/neo4j/connectors/kafka/metrics/JmxMetricsTest.kt new file mode 100644 index 000000000..68143c667 --- /dev/null +++ b/common/src/test/kotlin/org/neo4j/connectors/kafka/metrics/JmxMetricsTest.kt @@ -0,0 +1,132 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.metrics + +import java.lang.management.ManagementFactory +import javax.management.MBeanServer +import javax.management.ObjectName +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNotNull +import kotlin.test.assertTrue +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration + +class JmxMetricsTest { + + private val config = + mock { + on { connectorName } doReturn "my-connector" + on { taskId } doReturn "0" + } + + private lateinit var metrics: JmxMetrics + + @AfterEach + fun tearDown() { + metrics.close() + } + + @Test + fun `should register MBean on initialization`() { + // given MBean is not registered + assertFalse(mbs.isRegistered(objectName)) + + // when + metrics = JmxMetrics(config) + + // then + assertTrue(mbs.isRegistered(objectName)) + } + + @Test + fun `should unregister MBean on close`() { + // given + metrics = JmxMetrics(config) + assertTrue(mbs.isRegistered(objectName)) + + // when + metrics.close() + + // then + assertFalse(mbs.isRegistered(objectName)) + } + + @Test + fun `should provide gauge value via JMX`() { + // given + metrics = JmxMetrics(config) + + // when + metrics.addGauge("test_gauge", "A test gauge", linkedMapOf()) { 42 } + + // then + val value = mbs.getAttribute(objectName, "test_gauge") + assertEquals(42, value) + } + + @Test + fun `should expose MBean info with attributes`() { + // given + metrics = JmxMetrics(config) + + // when + metrics.addGauge("gauge_1", "Description 1", linkedMapOf()) { 1 } + metrics.addGauge("gauge_2", "Description 2", linkedMapOf()) { 2 } + + // then + val info = metrics.getMBeanInfo() + assertNotNull(info) + assertEquals(2, info.attributes.size) + + val attr1 = info.attributes.find { it.name == "gauge_1" } + assertNotNull(attr1) + assertEquals("Description 1", attr1.description) + assertEquals("java.lang.Number", attr1.type) + + val attr2 = info.attributes.find { it.name == "gauge_2" } + assertNotNull(attr2) + assertEquals("Description 2", attr2.description) + assertEquals("java.lang.Number", attr2.type) + } + + @Test + fun `should handle re-registration`() { + // given + metrics = JmxMetrics(config) + assertTrue(mbs.isRegistered(objectName)) + + // create another instance with the same name, should unregister old and register new + val metrics2 = JmxMetrics(config) + assertTrue(mbs.isRegistered(objectName)) + + metrics2.close() + assertFalse(mbs.isRegistered(objectName)) + + // original metrics close should not fail even if already unregistered + metrics.close() + } + + companion object { + val mbs: MBeanServer = ManagementFactory.getPlatformMBeanServer() + val objectName: ObjectName = + ObjectName("kafka.connect:type=plugins,connector=my-connector,task=0") + } +} diff --git a/packaging/pom.xml b/packaging/pom.xml index 374676651..cecf8697a 100644 --- a/packaging/pom.xml +++ b/packaging/pom.xml @@ -52,6 +52,11 @@ junit-jupiter test + + org.mockito.kotlin + mockito-kotlin + test + diff --git a/packaging/src/test/kotlin/ConfigPropertiesTest.kt b/packaging/src/test/kotlin/ConfigPropertiesTest.kt index 9eac3ae0a..2e855f137 100644 --- a/packaging/src/test/kotlin/ConfigPropertiesTest.kt +++ b/packaging/src/test/kotlin/ConfigPropertiesTest.kt @@ -26,12 +26,14 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource +import org.mockito.Mockito.mock import org.neo4j.caniuse.Neo4j import org.neo4j.caniuse.Neo4jDeploymentType import org.neo4j.caniuse.Neo4jEdition import org.neo4j.caniuse.Neo4jVersion import org.neo4j.cdc.client.selector.NodeSelector import org.neo4j.cdc.client.selector.RelationshipSelector +import org.neo4j.connectors.kafka.metrics.Metrics import org.neo4j.connectors.kafka.sink.SinkConfiguration import org.neo4j.connectors.kafka.sink.SinkStrategyHandler import org.neo4j.connectors.kafka.sink.strategy.CudHandler @@ -45,6 +47,8 @@ import org.neo4j.cypherdsl.core.renderer.Renderer class ConfigPropertiesTest { + private val metricsMock: Metrics = mock() + // This test checks that the number of config files is fixed. // If a new file is added, the test will fail, reminding the developer to update this test and add // unit tests for the new file. @@ -69,10 +73,11 @@ class ConfigPropertiesTest { SinkConfiguration(properties, Renderer.getDefaultRenderer(), neo4j, apocDoITAvailable) } - config.topicHandlers.keys shouldBe setOf("creates", "updates", "deletes") - config.topicHandlers["creates"] shouldBe instanceOf(expectedHandlerType) - config.topicHandlers["updates"] shouldBe instanceOf(expectedHandlerType) - config.topicHandlers["deletes"] shouldBe instanceOf(expectedHandlerType) + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers.keys shouldBe setOf("creates", "updates", "deletes") + topicHandlers["creates"] shouldBe instanceOf(expectedHandlerType) + topicHandlers["updates"] shouldBe instanceOf(expectedHandlerType) + topicHandlers["deletes"] shouldBe instanceOf(expectedHandlerType) } @ParameterizedTest @@ -90,10 +95,11 @@ class ConfigPropertiesTest { SinkConfiguration(properties, Renderer.getDefaultRenderer(), neo4j, apocDoITAvailable) } - config.topicHandlers.keys shouldBe setOf("creates", "updates", "deletes") - config.topicHandlers["creates"] shouldBe instanceOf(expectedHandlerType) - config.topicHandlers["updates"] shouldBe instanceOf(expectedHandlerType) - config.topicHandlers["deletes"] shouldBe instanceOf(expectedHandlerType) + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers.keys shouldBe setOf("creates", "updates", "deletes") + topicHandlers["creates"] shouldBe instanceOf(expectedHandlerType) + topicHandlers["updates"] shouldBe instanceOf(expectedHandlerType) + topicHandlers["deletes"] shouldBe instanceOf(expectedHandlerType) } @Test @@ -104,8 +110,9 @@ class ConfigPropertiesTest { val config = shouldNotThrowAny { SinkConfiguration(properties, Renderer.getDefaultRenderer()) } - config.topicHandlers.keys shouldBe setOf("people") - config.topicHandlers["people"].shouldBeInstanceOf() + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers.keys shouldBe setOf("people") + topicHandlers["people"].shouldBeInstanceOf() } @Test @@ -116,8 +123,9 @@ class ConfigPropertiesTest { val config = shouldNotThrowAny { SinkConfiguration(properties, Renderer.getDefaultRenderer()) } - config.topicHandlers.keys shouldBe setOf("people") - config.topicHandlers["people"].shouldBeInstanceOf() + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers.keys shouldBe setOf("people") + topicHandlers["people"].shouldBeInstanceOf() } @Test @@ -128,8 +136,9 @@ class ConfigPropertiesTest { val config = shouldNotThrowAny { SinkConfiguration(properties, Renderer.getDefaultRenderer()) } - config.topicHandlers.keys shouldBe setOf("people") - config.topicHandlers["people"].shouldBeInstanceOf() + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers.keys shouldBe setOf("people") + topicHandlers["people"].shouldBeInstanceOf() } @Test @@ -140,8 +149,9 @@ class ConfigPropertiesTest { val config = shouldNotThrowAny { SinkConfiguration(properties, Renderer.getDefaultRenderer()) } - config.topicHandlers.keys shouldBe setOf("knows") - config.topicHandlers["knows"].shouldBeInstanceOf() + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers.keys shouldBe setOf("knows") + topicHandlers["knows"].shouldBeInstanceOf() } @Test diff --git a/sink-connector/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jConnector.kt b/sink-connector/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jConnector.kt index d85e499cd..8378f2e98 100644 --- a/sink-connector/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jConnector.kt +++ b/sink-connector/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jConnector.kt @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.Config import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.connect.connector.Task import org.apache.kafka.connect.sink.SinkConnector +import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration.Companion.TASK_ID import org.neo4j.connectors.kafka.utils.PropertiesUtil class Neo4jConnector : SinkConnector() { @@ -27,13 +28,19 @@ class Neo4jConnector : SinkConnector() { override fun version(): String = PropertiesUtil.getVersion() - override fun start(props: MutableMap?) { - this.props = props!!.toMap() + override fun start(props: MutableMap) { + this.props = props.toMap() } override fun taskClass(): Class = Neo4jSinkTask::class.java - override fun taskConfigs(maxTasks: Int): List> = List(maxTasks) { props } + override fun taskConfigs(maxTasks: Int): List> = + (0 until maxTasks).toList().map { + buildMap { + putAll(props) + put(TASK_ID, it.toString()) + } + } override fun stop() {} diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt index d385b50ba..56402f194 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt @@ -22,14 +22,18 @@ import org.apache.kafka.connect.sink.SinkRecord import org.apache.kafka.connect.sink.SinkTask import org.jetbrains.annotations.VisibleForTesting import org.neo4j.connectors.kafka.configuration.helpers.VersionUtil +import org.neo4j.connectors.kafka.metrics.Metrics +import org.neo4j.connectors.kafka.metrics.MetricsFactory import org.slf4j.Logger import org.slf4j.LoggerFactory -class Neo4jSinkTask : SinkTask() { +class Neo4jSinkTask(private val metricsFactory: MetricsFactory = MetricsFactory()) : SinkTask() { private val log: Logger = LoggerFactory.getLogger(Neo4jSinkTask::class.java) private lateinit var settings: Map @VisibleForTesting lateinit var config: SinkConfiguration + private lateinit var metrics: Metrics + private lateinit var topicHandlers: Map override fun version(): String = VersionUtil.version(Neo4jSinkTask::class.java) @@ -38,6 +42,10 @@ class Neo4jSinkTask : SinkTask() { settings = props!! config = SinkConfiguration(settings) + + metrics = metricsFactory.createMetrics(config) + topicHandlers = SinkStrategyHandler.createFrom(config, metrics) + config.validateAllTopics(topicHandlers) } override fun stop() { @@ -45,6 +53,9 @@ class Neo4jSinkTask : SinkTask() { if (this::config.isInitialized) { config.close() } + if (this::metrics.isInitialized) { + metrics.close() + } } override fun put(records: Collection?) { @@ -53,7 +64,7 @@ class Neo4jSinkTask : SinkTask() { records ?.map { SinkMessage(it) } ?.groupBy { it.topic } - ?.mapKeys { config.topicHandlers.getValue(it.key) } + ?.mapKeys { topicHandlers.getValue(it.key) } ?.forEach { (handler, messages) -> processMessages(handler, messages) } } log.info("processed {} records in {} ms", records?.size ?: 0, duration.inWholeMilliseconds) @@ -83,6 +94,7 @@ class Neo4jSinkTask : SinkTask() { ) log.trace("after write transaction for group {}", index) } + handler.postProcessLastMessageBatch(group) handled.addAll(group.flatMap { it.messages }) } diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkConfiguration.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkConfiguration.kt index da58cfe96..6dbc27930 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkConfiguration.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkConfiguration.kt @@ -51,7 +51,6 @@ class SinkConfiguration : Neo4jConfiguration { fixedRenderer = renderer _neo4j = neo4j this.apocCypherDoItAvailable = apocCypherDoItAvailable - validateAllTopics() } val batchSize @@ -125,14 +124,10 @@ class SinkConfiguration : Neo4jConfiguration { originalsStrings()[SinkTask.TOPICS_CONFIG]?.split(',')?.map { it.trim() }?.toList() ?: emptyList() - val topicHandlers: Map by lazy { - SinkStrategyHandler.createFrom(this) - } - override fun userAgentComment(): String = SinkStrategyHandler.configuredStrategies(this).sorted().joinToString("; ") - private fun validateAllTopics() { + fun validateAllTopics(topicHandlers: Map) { val sourceTopics = topicNames.toSet() val configuredTopics = topicHandlers.keys diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt index 57f70badc..0b61a0957 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt @@ -26,6 +26,7 @@ import org.neo4j.connectors.kafka.data.cdcTxId import org.neo4j.connectors.kafka.data.cdcTxSeq import org.neo4j.connectors.kafka.data.fetchConstraintData import org.neo4j.connectors.kafka.data.isCdcMessage +import org.neo4j.connectors.kafka.metrics.Metrics import org.neo4j.connectors.kafka.sink.strategy.CudHandler import org.neo4j.connectors.kafka.sink.strategy.CypherHandler import org.neo4j.connectors.kafka.sink.strategy.NodePatternHandler @@ -133,13 +134,19 @@ interface SinkStrategyHandler { */ fun handle(messages: Iterable): Iterable> + fun postProcessLastMessageBatch(group: Iterable) {} + companion object { - fun createFrom(config: SinkConfiguration): Map { - return config.topicNames.associateWith { topic -> createForTopic(topic, config) } + fun createFrom(config: SinkConfiguration, metrics: Metrics): Map { + return config.topicNames.associateWith { topic -> createForTopic(topic, config, metrics) } } - private fun createForTopic(topic: String, config: SinkConfiguration): SinkStrategyHandler { + private fun createForTopic( + topic: String, + config: SinkConfiguration, + metrics: Metrics, + ): SinkStrategyHandler { var handler: SinkStrategyHandler? = null val originals = config.originalsStrings() @@ -237,6 +244,7 @@ interface SinkStrategyHandler { SinkStrategy.CDC_SOURCE_ID, batchStrategy, CdcSourceIdEventTransformer(topic, labelName, propertyName), + metrics, ) } @@ -265,7 +273,12 @@ interface SinkStrategyHandler { } handler = - CdcHandler(SinkStrategy.CDC_SCHEMA, batchStrategy, CdcSchemaEventTransformer(topic)) + CdcHandler( + SinkStrategy.CDC_SCHEMA, + batchStrategy, + CdcSchemaEventTransformer(topic), + metrics, + ) } val cudTopics = config.getList(SinkConfiguration.CUD_TOPICS) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcHandler.kt index d8d5301bb..3ae3ef139 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcHandler.kt @@ -18,8 +18,11 @@ package org.neo4j.connectors.kafka.sink.strategy.cdc import org.apache.kafka.connect.data.Struct import org.neo4j.cdc.client.model.ChangeEvent +import org.neo4j.connectors.kafka.configuration.ConnectorType.SINK import org.neo4j.connectors.kafka.data.StreamsTransactionEventExtensions.toChangeEvent import org.neo4j.connectors.kafka.data.toChangeEvent +import org.neo4j.connectors.kafka.metrics.CdcMetricsData +import org.neo4j.connectors.kafka.metrics.Metrics import org.neo4j.connectors.kafka.sink.ChangeQuery import org.neo4j.connectors.kafka.sink.SinkMessage import org.neo4j.connectors.kafka.sink.SinkStrategy @@ -31,13 +34,20 @@ class CdcHandler( private val strategy: SinkStrategy, internal val batchStrategy: CdcBatchStrategy, internal val eventTransformer: CdcEventTransformer, + metrics: Metrics, ) : SinkStrategyHandler { + private val metricsData = CdcMetricsData(metrics, SINK) + override fun strategy(): SinkStrategy = strategy override fun handle(messages: Iterable): Iterable> { return batchStrategy.handle(messages) { eventTransformer.transform(it) } } + + override fun postProcessLastMessageBatch(group: Iterable) { + group.lastOrNull()?.messages?.lastOrNull()?.toChangeEvent()?.let { metricsData.update(it) } + } } internal fun SinkMessage.toChangeEvent(): ChangeEvent = diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/SinkConfigurationTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/SinkConfigurationTest.kt index 3446cdc31..549d6bec6 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/SinkConfigurationTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/SinkConfigurationTest.kt @@ -29,11 +29,13 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.EnumSource import org.junit.jupiter.params.provider.MethodSource +import org.mockito.kotlin.mock import org.neo4j.caniuse.Neo4j import org.neo4j.caniuse.Neo4jDeploymentType import org.neo4j.caniuse.Neo4jEdition import org.neo4j.caniuse.Neo4jVersion import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration +import org.neo4j.connectors.kafka.metrics.Metrics import org.neo4j.connectors.kafka.sink.strategy.CudHandler import org.neo4j.connectors.kafka.sink.strategy.CypherHandler import org.neo4j.connectors.kafka.sink.strategy.NodePatternHandler @@ -45,6 +47,8 @@ import org.neo4j.driver.TransactionConfig class SinkConfigurationTest { + val metricsMock: Metrics = mock() + @Test fun `should throw a ConfigException because of mismatch`() { shouldThrow { @@ -56,7 +60,9 @@ class SinkConfigurationTest { "${SinkConfiguration.CYPHER_TOPIC_PREFIX}foo" to "CREATE (p:Person{name: event.firstName})", ) - SinkConfiguration(originals, Renderer.getDefaultRenderer()) + val config = SinkConfiguration(originals, Renderer.getDefaultRenderer()) + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + config.validateAllTopics(topicHandlers) } shouldHaveMessage "Topic 'bar' is not assigned a sink strategy" } @@ -75,7 +81,9 @@ class SinkConfigurationTest { SinkConfiguration.CDC_SOURCE_ID_TOPICS to "foo", ) - SinkConfiguration(originals, Renderer.getDefaultRenderer()) + val config = SinkConfiguration(originals, Renderer.getDefaultRenderer()) + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + config.validateAllTopics(topicHandlers) } shouldHaveMessage "Topic 'foo' has multiple strategies defined" } @@ -94,9 +102,11 @@ class SinkConfigurationTest { val config = SinkConfiguration(originals, Renderer.getDefaultRenderer()) config.batchSize shouldBe 10 - config.topicHandlers shouldHaveKey "foo" - config.topicHandlers["foo"] shouldBe instanceOf() - (config.topicHandlers["foo"] as CypherHandler).query shouldBe + + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers shouldHaveKey "foo" + topicHandlers["foo"] shouldBe instanceOf() + (topicHandlers["foo"] as CypherHandler).query shouldBe "CREATE (p:Person{name: event.firstName})" } @@ -114,9 +124,11 @@ class SinkConfigurationTest { val config = SinkConfiguration(originals, Renderer.getDefaultRenderer()) config.batchSize shouldBe 10 - config.topicHandlers shouldHaveKey "foo" - config.topicHandlers["foo"] shouldBe instanceOf() - (config.topicHandlers["foo"] as NodePatternHandler).pattern shouldBe + + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers shouldHaveKey "foo" + topicHandlers["foo"] shouldBe instanceOf() + (topicHandlers["foo"] as NodePatternHandler).pattern shouldBe NodePattern( setOf("Foo"), false, @@ -125,9 +137,9 @@ class SinkConfigurationTest { emptySet(), ) - config.topicHandlers shouldHaveKey "bar" - config.topicHandlers["bar"] shouldBe instanceOf() - (config.topicHandlers["bar"] as NodePatternHandler).pattern shouldBe + topicHandlers shouldHaveKey "bar" + topicHandlers["bar"] shouldBe instanceOf() + (topicHandlers["bar"] as NodePatternHandler).pattern shouldBe NodePattern( setOf("Bar"), false, @@ -199,11 +211,12 @@ class SinkConfigurationTest { neo4j = neo4j5_26, ) - config.topicHandlers shouldHaveKey "foo" - config.topicHandlers["foo"] shouldBe instanceOf() + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers shouldHaveKey "foo" + topicHandlers["foo"] shouldBe instanceOf() - config.topicHandlers shouldHaveKey "bar" - config.topicHandlers["bar"] shouldBe instanceOf() + topicHandlers shouldHaveKey "bar" + topicHandlers["bar"] shouldBe instanceOf() } @ParameterizedTest @@ -228,11 +241,12 @@ class SinkConfigurationTest { apocCypherDoItAvailable = apocDoItAvailable, ) - config.topicHandlers shouldHaveKey "foo" - config.topicHandlers["foo"] shouldBe instanceOf(clazz) + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers shouldHaveKey "foo" + topicHandlers["foo"] shouldBe instanceOf(clazz) - config.topicHandlers shouldHaveKey "bar" - config.topicHandlers["bar"] shouldBe instanceOf(clazz) + topicHandlers shouldHaveKey "bar" + topicHandlers["bar"] shouldBe instanceOf(clazz) } @Test @@ -246,11 +260,12 @@ class SinkConfigurationTest { ) val config = SinkConfiguration(originals, Renderer.getDefaultRenderer()) - config.topicHandlers shouldHaveKey "foo" - config.topicHandlers["foo"] shouldBe instanceOf() + val topicHandlers = SinkStrategyHandler.createFrom(config, metricsMock) + topicHandlers shouldHaveKey "foo" + topicHandlers["foo"] shouldBe instanceOf() - config.topicHandlers shouldHaveKey "bar" - config.topicHandlers["bar"] shouldBe instanceOf() + topicHandlers shouldHaveKey "bar" + topicHandlers["bar"] shouldBe instanceOf() } @ParameterizedTest diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSchemaHandlerIT.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSchemaHandlerIT.kt index c0011a218..81efe6bc0 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSchemaHandlerIT.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSchemaHandlerIT.kt @@ -36,8 +36,10 @@ import org.neo4j.cdc.client.model.NodeEvent import org.neo4j.cdc.client.model.NodeState import org.neo4j.cdc.client.model.RelationshipEvent import org.neo4j.cdc.client.model.RelationshipState +import org.neo4j.connectors.kafka.metrics.Metrics import org.neo4j.connectors.kafka.sink.Neo4jSinkTask import org.neo4j.connectors.kafka.sink.SinkStrategy.CDC_SCHEMA +import org.neo4j.connectors.kafka.sink.SinkStrategyHandler import org.neo4j.connectors.kafka.sink.strategy.TestUtils.newChangeEventMessage import org.neo4j.connectors.kafka.sink.strategy.TestUtils.verifyEosOffsetIfEnabled import org.neo4j.connectors.kafka.testing.DatabaseSupport.createDatabase @@ -95,7 +97,8 @@ abstract class CdcSchemaHandlerIT( } ) - val handler = task.config.topicHandlers["my-topic"] + val metricsMock: Metrics = mock() + val handler = SinkStrategyHandler.createFrom(task.config, metricsMock)["my-topic"] handler shouldBe instanceOf() val cdcHandler = handler as CdcHandler diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSourceIdHandlerIT.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSourceIdHandlerIT.kt index 061965442..49c8e0a33 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSourceIdHandlerIT.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSourceIdHandlerIT.kt @@ -36,8 +36,10 @@ import org.neo4j.cdc.client.model.NodeEvent import org.neo4j.cdc.client.model.NodeState import org.neo4j.cdc.client.model.RelationshipEvent import org.neo4j.cdc.client.model.RelationshipState +import org.neo4j.connectors.kafka.metrics.Metrics import org.neo4j.connectors.kafka.sink.Neo4jSinkTask import org.neo4j.connectors.kafka.sink.SinkStrategy.CDC_SOURCE_ID +import org.neo4j.connectors.kafka.sink.SinkStrategyHandler import org.neo4j.connectors.kafka.sink.strategy.TestUtils.newChangeEventMessage import org.neo4j.connectors.kafka.sink.strategy.TestUtils.verifyEosOffsetIfEnabled import org.neo4j.connectors.kafka.testing.DatabaseSupport.createDatabase @@ -110,7 +112,8 @@ abstract class CdcSourceIdHandlerIT( } ) - val handler = task.config.topicHandlers["my-topic"] + val metricsMock: Metrics = mock() + val handler = SinkStrategyHandler.createFrom(task.config, metricsMock)["my-topic"] handler shouldBe instanceOf() val cdcHandler = handler as CdcHandler diff --git a/source-connector/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jConnector.kt b/source-connector/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jConnector.kt index 4ef671f66..a048c2f93 100644 --- a/source-connector/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jConnector.kt +++ b/source-connector/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jConnector.kt @@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.connect.connector.Task import org.apache.kafka.connect.source.ExactlyOnceSupport import org.apache.kafka.connect.source.SourceConnector +import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration.Companion.TASK_ID import org.neo4j.connectors.kafka.configuration.helpers.VersionUtil import org.neo4j.connectors.kafka.source.SourceConfiguration.Companion.STRATEGY @@ -44,7 +45,13 @@ class Neo4jConnector : SourceConnector() { SourceType.QUERY -> Neo4jQueryTask::class.java } - override fun taskConfigs(maxTasks: Int): List> = listOf(props) + override fun taskConfigs(maxTasks: Int): List> = + (0 until maxTasks).toList().map { + buildMap { + putAll(props) + put(TASK_ID, it.toString()) + } + } override fun stop() {} diff --git a/source/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcTask.kt b/source/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcTask.kt index 90b6325ab..f8c1c0475 100644 --- a/source/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcTask.kt +++ b/source/src/main/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcTask.kt @@ -24,6 +24,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.flatMapConcat +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.runBlocking @@ -33,16 +34,20 @@ import org.neo4j.cdc.client.CDCClient import org.neo4j.cdc.client.CDCService import org.neo4j.cdc.client.model.ChangeEvent import org.neo4j.cdc.client.model.ChangeIdentifier +import org.neo4j.connectors.kafka.configuration.ConnectorType.SOURCE import org.neo4j.connectors.kafka.configuration.helpers.VersionUtil import org.neo4j.connectors.kafka.data.ChangeEventConverter import org.neo4j.connectors.kafka.data.Headers -import org.neo4j.connectors.kafka.data.ValueConverter +import org.neo4j.connectors.kafka.metrics.CdcMetricsData +import org.neo4j.connectors.kafka.metrics.DbTransactionMetricsData +import org.neo4j.connectors.kafka.metrics.Metrics +import org.neo4j.connectors.kafka.metrics.MetricsFactory import org.neo4j.driver.SessionConfig import org.neo4j.driver.TransactionConfig import org.slf4j.Logger import org.slf4j.LoggerFactory -class Neo4jCdcTask : SourceTask() { +class Neo4jCdcTask(private val metricsFactory: MetricsFactory = MetricsFactory()) : SourceTask() { private val log: Logger = LoggerFactory.getLogger(Neo4jCdcTask::class.java) private lateinit var settings: Map @@ -51,11 +56,14 @@ class Neo4jCdcTask : SourceTask() { private lateinit var transactionConfig: TransactionConfig private lateinit var cdc: CDCService private lateinit var offset: AtomicReference - private lateinit var converter: ValueConverter private lateinit var changeEventConverter: ChangeEventConverter internal fun latestOffset(): String = offset.get() + private lateinit var metrics: Metrics + private lateinit var metricsData: CdcMetricsData + private lateinit var dbTransactionMetricsData: DbTransactionMetricsData + override fun version(): String = VersionUtil.version(this.javaClass as Class<*>) override fun start(props: Map?) { @@ -70,6 +78,8 @@ class Neo4jCdcTask : SourceTask() { sessionConfig = configBuilder.build() transactionConfig = config.txConfig() + metrics = metricsFactory.createMetrics(config) + cdc = CDCClient( config.driver, @@ -84,17 +94,36 @@ class Neo4jCdcTask : SourceTask() { log.info("resuming from offset: ${offset.get()}") changeEventConverter = ChangeEventConverter(config.payloadMode) + + metricsData = CdcMetricsData(metrics, SOURCE) + if (config.lastDbTxIdEnabled) { + dbTransactionMetricsData = + DbTransactionMetricsData( + metrics = metrics, + neo4jDriver = config.driver, + databaseName = config.database, + transactionConfig = transactionConfig, + refreshInterval = config.lastDbTxIdRefreshInterval, + ) + } } override fun stop() { log.info("stopping") config.close() + if (this::dbTransactionMetricsData.isInitialized) { + dbTransactionMetricsData.close() + } + if (this::metrics.isInitialized) { + metrics.close() + } } @OptIn(ExperimentalCoroutinesApi::class) override fun poll(): MutableList { log.info("polling from offset: ${offset.get()}") val list = mutableListOf() + var lastChangeEvent: ChangeEvent? = null runBlocking { val timeSource = TimeSource.Monotonic @@ -105,6 +134,7 @@ class Neo4jCdcTask : SourceTask() { cdc.query(ChangeIdentifier(offset.get()), { lastKnownId -> offset.set(lastKnownId.id) }) .take(config.batchSize.toLong(), true) .asFlow() + .onEach { lastChangeEvent = it } .flatMapConcat { build(it) } .toList(list) if (list.isNotEmpty()) { @@ -114,8 +144,9 @@ class Neo4jCdcTask : SourceTask() { delay(config.cdcPollingInterval) } - if (list.isNotEmpty()) { - offset.set(list.last().sourceOffset()["value"] as String) + if (lastChangeEvent != null) { + offset.set(lastChangeEvent.id.id) + metricsData.update(lastChangeEvent) } } diff --git a/source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt b/source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt index ac4875791..0b7969704 100644 --- a/source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt +++ b/source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt @@ -22,6 +22,7 @@ import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration import org.apache.kafka.common.config.Config import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.common.config.ConfigDef.Importance import org.apache.kafka.common.config.ConfigDef.Range import org.apache.kafka.common.config.ConfigException import org.neo4j.cdc.client.model.EntityOperation @@ -119,6 +120,12 @@ class SourceConfiguration(originals: Map<*, *>) : val cdcPollingDuration get(): Duration = Duration.parseSimpleString(getString(CDC_POLL_DURATION)) + val lastDbTxIdEnabled + get(): Boolean = getString(CDC_METRIC_LAST_TX_ID_ENABLED).toBoolean() + + val lastDbTxIdRefreshInterval + get(): Duration = Duration.parseSimpleString(getString(CDC_METRIC_LAST_TX_ID_REFRESH_INTERVAL)) + val cdcSelectorsToTopics: Map> by lazy { when (strategy) { SourceType.CDC -> { @@ -515,13 +522,17 @@ class SourceConfiguration(originals: Map<*, *>) : "^neo4j\\.cdc\\.topic\\.(?<$GROUP_NAME_TOPIC>[a-zA-Z0-9._-]+)(\\.patterns)\\.(?<$GROUP_NAME_INDEX>[0-9]+)(\\.metadata)\\.(?<$GROUP_NAME_METADATA>[a-zA-Z0-9._-]+)$" ) + const val CDC_METRIC_LAST_TX_ID_ENABLED = "neo4j.cdc.metric.last-db-tx-id.enabled" + const val CDC_METRIC_LAST_TX_ID_REFRESH_INTERVAL = + "neo4j.cdc.metric.last-db-tx-id.refresh-interval" + private val DEFAULT_QUERY_POLL_INTERVAL = 1.seconds private val DEFAULT_QUERY_POLL_DURATION = 5.seconds private const val DEFAULT_BATCH_SIZE = 1000 private val DEFAULT_QUERY_TIMEOUT = 0.seconds private const val DEFAULT_QUERY_FORCE_MAPS_AS_STRUCT = true - private val DEFAULT_CDC_USE_LEADER = false + private const val DEFAULT_CDC_USE_LEADER = false private val DEFAULT_CDC_POLL_INTERVAL = 1.seconds private val DEFAULT_CDC_POLL_DURATION = 5.seconds private const val DEFAULT_STREAMING_PROPERTY = "timestamp" @@ -753,5 +764,24 @@ class SourceConfiguration(originals: Map<*, *>) : recommender = Recommenders.enum(PayloadMode::class.java) } ) + .define( + ConfigKeyBuilder.of(CDC_METRIC_LAST_TX_ID_ENABLED, ConfigDef.Type.STRING) { + importance = Importance.LOW + defaultValue = "false" + group = Groups.CONNECTOR_ADVANCED.title + validator = Validators.bool() + recommender = Recommenders.bool() + documentation = "Whether to enable the `last_db_tx_id` metric." + } + ) + .define( + ConfigKeyBuilder.of(CDC_METRIC_LAST_TX_ID_REFRESH_INTERVAL, ConfigDef.Type.STRING) { + importance = Importance.LOW + defaultValue = 30.seconds.toSimpleString() + group = Groups.CONNECTOR_ADVANCED.title + validator = Validators.pattern(SIMPLE_DURATION_PATTERN) + documentation = "The refresh interval of the `last_db_tx_id` metric." + } + ) } } diff --git a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/SourceConfigurationTest.kt b/source/src/test/kotlin/org/neo4j/connectors/kafka/source/SourceConfigurationTest.kt index 4073cd9d7..d19d35d81 100644 --- a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/SourceConfigurationTest.kt +++ b/source/src/test/kotlin/org/neo4j/connectors/kafka/source/SourceConfigurationTest.kt @@ -23,6 +23,8 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.throwable.shouldHaveMessage import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertTrue import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds import org.apache.kafka.common.config.ConfigException @@ -35,6 +37,8 @@ import org.neo4j.cdc.client.selector.RelationshipSelector import org.neo4j.connectors.kafka.configuration.AuthenticationType import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration import org.neo4j.connectors.kafka.configuration.PayloadMode +import org.neo4j.connectors.kafka.source.SourceConfiguration.Companion.CDC_METRIC_LAST_TX_ID_ENABLED +import org.neo4j.connectors.kafka.source.SourceConfiguration.Companion.CDC_METRIC_LAST_TX_ID_REFRESH_INTERVAL import org.neo4j.driver.AccessMode import org.neo4j.driver.TransactionConfig @@ -796,4 +800,24 @@ class SourceConfigurationTest { config.txConfig() shouldBe TransactionConfig.builder().withMetadata(mapOf("app" to "kafka-source")).build() } + + @Test + fun `metric settings`() { + SourceConfiguration(mapOf(Neo4jConfiguration.URI to "bolt://localhost")).run { + assertFalse(this.lastDbTxIdEnabled) + assertEquals(30.seconds, this.lastDbTxIdRefreshInterval) + } + + SourceConfiguration( + mapOf( + Neo4jConfiguration.URI to "bolt://localhost", + CDC_METRIC_LAST_TX_ID_ENABLED to "true", + CDC_METRIC_LAST_TX_ID_REFRESH_INTERVAL to "1m", + ) + ) + .run { + assertTrue(this.lastDbTxIdEnabled) + assertEquals(1.minutes, this.lastDbTxIdRefreshInterval) + } + } }