Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
55d08d9
feat: add kafak metrics to apoc cdc handler
venikkin Feb 20, 2026
d2d7b7a
feat: add metrics data class
venikkin Feb 23, 2026
7bdb98c
Merge remote-tracking branch 'origin/main' into cdc-metrics-native
venikkin Feb 23, 2026
9b98c9c
feat: update cdc sink handler
venikkin Feb 23, 2026
94a8d08
feat: add source metrics
venikkin Feb 23, 2026
d20ca20
feat: implement jmx reporter
venikkin Feb 24, 2026
cc01c15
refactor: clean up
venikkin Feb 24, 2026
f5fed7c
chore: spotless
venikkin Feb 24, 2026
8cd6cfd
refactor: use write access mode to retrieve last db tx id from a leader
venikkin Feb 24, 2026
053fe21
refactor: parameterize last db metric
venikkin Feb 24, 2026
b51e87e
fix: update unit tests
venikkin Feb 24, 2026
a666fc1
refactor: downgrade to kafka 3.8
venikkin Feb 25, 2026
b8abc09
Merge branch 'main' into cdc-metrics-native
venikkin Feb 25, 2026
39d659d
test: add metric tests
venikkin Feb 25, 2026
bf7b24b
chore: udate test description
venikkin Feb 25, 2026
ba7bdf0
chore: spotless apply
venikkin Feb 26, 2026
5d464ad
refactor: move tx id properties to source config
venikkin Feb 26, 2026
167f58a
chore: update default value
venikkin Feb 26, 2026
a0e4635
refactor: review fixes
venikkin Feb 27, 2026
bfa81f6
refactor: pass databaseName to db tx metric data
venikkin Feb 27, 2026
d3356cb
fix: update unit test
venikkin Feb 27, 2026
c6d906b
fix: typo
venikkin Feb 27, 2026
d2a1084
Merge branch 'main' into cdc-metrics-native
venikkin Feb 27, 2026
1f3e3fe
refactor: update metrics description
venikkin Feb 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.Closeable
import java.io.File
import java.net.URI
import java.util.concurrent.TimeUnit
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import org.apache.kafka.common.config.AbstractConfig
Expand Down Expand Up @@ -70,33 +71,29 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
get(): List<URI> = getList(URI).map { URI(it) }

internal val connectionTimeout
get(): kotlin.time.Duration =
kotlin.time.Duration.parseSimpleString(getString(CONNECTION_TIMEOUT))
get(): Duration = Duration.parseSimpleString(getString(CONNECTION_TIMEOUT))

internal val maxRetryTime
get(): kotlin.time.Duration =
kotlin.time.Duration.parseSimpleString(getString(MAX_TRANSACTION_RETRY_TIMEOUT))
get(): Duration = Duration.parseSimpleString(getString(MAX_TRANSACTION_RETRY_TIMEOUT))

internal val maxConnectionPoolSize
get(): Int = getInt(POOL_MAX_CONNECTION_POOL_SIZE)

internal val connectionAcquisitionTimeout
get(): kotlin.time.Duration =
kotlin.time.Duration.parseSimpleString(getString(POOL_CONNECTION_ACQUISITION_TIMEOUT))
get(): Duration = Duration.parseSimpleString(getString(POOL_CONNECTION_ACQUISITION_TIMEOUT))

internal val idleTimeBeforeTest
get(): kotlin.time.Duration =
get(): Duration =
getString(POOL_IDLE_TIME_BEFORE_TEST).orEmpty().run {
if (this.isEmpty()) {
(-1).milliseconds
} else {
kotlin.time.Duration.parseSimpleString(this)
Duration.parseSimpleString(this)
}
}

internal val maxConnectionLifetime
get(): kotlin.time.Duration =
kotlin.time.Duration.parseSimpleString(getString(POOL_MAX_CONNECTION_LIFETIME))
get(): Duration = Duration.parseSimpleString(getString(POOL_MAX_CONNECTION_LIFETIME))

internal val encrypted
get(): Boolean = getString(SECURITY_ENCRYPTED).toBoolean()
Expand Down Expand Up @@ -239,6 +236,12 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
}
}

val connectorName
get(): String = originals()[CONNECTOR_NAME].toString()

val taskId
get(): String = originals()[TASK_ID].toString()

companion object {
val DEFAULT_MAX_RETRY_DURATION = 30.seconds

Expand Down Expand Up @@ -271,6 +274,10 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
const val SECURITY_TRUST_STRATEGY = "neo4j.security.trust-strategy"
const val SECURITY_CERT_FILES = "neo4j.security.cert-files"

// internal properties
const val CONNECTOR_NAME = "name"
const val TASK_ID = "neo4j.task.id"

/** Perform validation on dependent configuration items */
fun validate(config: org.apache.kafka.common.config.Config) {
// authentication configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.metrics

import java.util.concurrent.atomic.AtomicLong
import org.neo4j.cdc.client.model.ChangeEvent

class CdcMetricsData(metrics: Metrics, tags: LinkedHashMap<String, String> = linkedMapOf()) {

private var lastTxCommitTs: AtomicLong = AtomicLong(0L)
private var lastTxStartTs: AtomicLong = AtomicLong(0L)
private var lastTxId: AtomicLong = AtomicLong(0L)

init {
metrics.addGauge(
"last_cdc_tx_commit_timestamp",
"The transaction commit timestamp of the last processed CDC message",
tags,
) {
lastTxCommitTs.get()
}
metrics.addGauge(
"last_cdc_tx_start_timestamp",
"The transaction start timestamp of the last processed CDC message",
tags,
) {
lastTxStartTs.get()
}
metrics.addGauge(
"last_cdc_tx_id",
"The transaction id of the last processed CDC message",
tags,
) {
lastTxId.get()
}
}

fun update(event: ChangeEvent) {
event.metadata?.let {
lastTxCommitTs.set(it.txCommitTime.toEpochSecond())
lastTxStartTs.set(it.txStartTime.toEpochSecond())
}
lastTxId.set(event.txId)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.metrics

import java.io.Closeable
import java.util.concurrent.atomic.AtomicLong
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.neo4j.driver.AccessMode
import org.neo4j.driver.Driver
import org.neo4j.driver.SessionConfig
import org.neo4j.driver.TransactionConfig

class DbTransactionMetricsData(
metrics: Metrics,
tags: LinkedHashMap<String, String> = linkedMapOf(),
refreshInterval: Duration,
neo4jDriver: Driver,
sessionConfig: SessionConfig,
transactionConfig: TransactionConfig,
dispatcher: CoroutineDispatcher = Dispatchers.Default,
) : Closeable {

private val writeAccessModeSessionConfig: SessionConfig by lazy {
val builder = SessionConfig.builder()

sessionConfig.database().ifPresent { builder.withDatabase(it) }
sessionConfig.fetchSize().ifPresent { builder.withFetchSize(it) }
sessionConfig.impersonatedUser().ifPresent { builder.withImpersonatedUser(it) }
sessionConfig.bookmarks()?.let { builder.withBookmarks(it) }

builder.withDefaultAccessMode(AccessMode.WRITE)
builder.build()
}

private val lastTransactionId = AtomicLong(0)
private val scope = CoroutineScope(dispatcher + Job())

init {
metrics.addGauge(
"last_db_tx_id",
"The transaction commit timestamp of the last processed CDC message",
tags,
) {
lastTransactionId.get()
}

scope.launch {
val databaseName = writeAccessModeSessionConfig.database().orElse("neo4j")
while (isActive) {
val txId =
neo4jDriver.session(writeAccessModeSessionConfig).use { session ->
session
.run(
"SHOW DATABASE $databaseName YIELD lastCommittedTxn RETURN lastCommittedTxn as txId",
transactionConfig,
)
.single()
.get("txId")
.asLong()
}
lastTransactionId.set(txId)

delay(refreshInterval)
}
}
}

override fun close() {
scope.cancel()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.metrics

import java.lang.management.ManagementFactory
import java.util.Hashtable
import java.util.concurrent.ConcurrentHashMap
import javax.management.Attribute
import javax.management.AttributeList
import javax.management.DynamicMBean
import javax.management.MBeanAttributeInfo
import javax.management.MBeanInfo
import javax.management.ObjectName
import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration

class JmxMetrics(config: Neo4jConfiguration) : Metrics, DynamicMBean {

private val connectorName: String = config.connectorName
private val taskId: String = config.taskId
private val objectName: ObjectName =
ObjectName(
"kafka.connect",
Hashtable<String, String>().apply {
put("type", "plugins")
put("connector", connectorName)
put("task", taskId)
},
)

private val gauges = ConcurrentHashMap<String, Gauge<*>>()
private val mbs = ManagementFactory.getPlatformMBeanServer()

init {
if (mbs.isRegistered(objectName)) {
mbs.unregisterMBean(objectName)
}
mbs.registerMBean(this, objectName)
}

override fun <T : Number> addGauge(
name: String,
description: String,
tags: LinkedHashMap<String, String>,
valueProvider: () -> T?,
) {
gauges[name] = Gauge(name, description, valueProvider)
}

override fun getAttribute(attribute: String?): Any? {
return gauges[attribute]?.valueProvider?.invoke()
}

override fun setAttribute(attribute: Attribute?) {
throw UnsupportedOperationException("Attributes are read-only")
}

override fun getAttributes(attributes: Array<out String>?): AttributeList {
val list = AttributeList()
attributes?.forEach { name ->
getAttribute(name)?.let { value -> list.add(Attribute(name, value)) }
}
return list
}

override fun setAttributes(attributes: AttributeList?): AttributeList {
throw UnsupportedOperationException("Attributes are read-only")
}

override fun invoke(
actionName: String?,
params: Array<out Any>?,
signature: Array<out String>?,
): Any {
throw UnsupportedOperationException("Operations are not supported")
}

override fun getMBeanInfo(): MBeanInfo {
val attrs =
gauges.values.map { gauge ->
MBeanAttributeInfo(gauge.name, "java.lang.Number", gauge.description, true, false, false)
}

return MBeanInfo(
this.javaClass.name,
"Neo4j Kafka Connector JMX Metrics",
attrs.toTypedArray(),
null,
null,
null,
)
}

private class Gauge<T : Number>(
val name: String,
val description: String,
val valueProvider: () -> T?,
)

override fun close() {
if (mbs.isRegistered(objectName)) {
mbs.unregisterMBean(objectName)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.metrics

import java.io.Closeable
import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration

class MetricsFactory {

fun createMetrics(config: Neo4jConfiguration): Metrics {
return JmxMetrics(config)
}
}

interface Metrics : Closeable {
fun <T : Number> addGauge(
name: String,
description: String,
tags: LinkedHashMap<String, String>,
valueProvider: () -> T?,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ neo4j.cdc.use-leader=Type: Boolean;\nDescription: Whether to use leader for chan
neo4j.cdc.poll-duration=Type: Duration;\nDescription: Maximum amount of time Kafka Connect poll request will wait for a change to be received from the database (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`).
neo4j.cdc.poll-interval=Type: Duration;\nDescription: The interval in which the database will be polled for changes (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`).
neo4j.payload-mode=Type: Enum<COMPACT, EXTENDED, RAW_JSON_STRING>;\nDescription: Defines the structure of change messages generated. `COMPACT` produces messages which are simpler but with potential schema compatibility and type safety issues, while `EXTENDED` produces messages with extra type information which removes the limitations of `COMPACT` mode. For example, a property type change will lead to schema compatibility failures in `COMPACT` mode. `RAW_JSON_STRING` mode generates messages which are serialized as raw JSON strings. Default is `EXTENDED`.
neo4j.cdc.metric.last-tx-id.enabled=Type: Boolean;\nDescription: Whether to enable the `last_db_tx_id` metric.
neo4j.cdc.metric.last-tx-id.refresh-interval=Type: Duration;\nDescription: Interval between metric updates (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`). Default value is 30 seconds.

Loading