Skip to content

Commit e0cb9e6

Browse files
authored
feat: cdc metrics (#518)
1 parent da86a73 commit e0cb9e6

File tree

24 files changed

+879
-75
lines changed

24 files changed

+879
-75
lines changed

common/src/main/kotlin/org/neo4j/connectors/kafka/configuration/Neo4jConfiguration.kt

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.io.Closeable
2020
import java.io.File
2121
import java.net.URI
2222
import java.util.concurrent.TimeUnit
23+
import kotlin.time.Duration
2324
import kotlin.time.Duration.Companion.milliseconds
2425
import kotlin.time.Duration.Companion.seconds
2526
import org.apache.kafka.common.config.AbstractConfig
@@ -70,33 +71,29 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
7071
get(): List<URI> = getList(URI).map { URI(it) }
7172

7273
internal val connectionTimeout
73-
get(): kotlin.time.Duration =
74-
kotlin.time.Duration.parseSimpleString(getString(CONNECTION_TIMEOUT))
74+
get(): Duration = Duration.parseSimpleString(getString(CONNECTION_TIMEOUT))
7575

7676
internal val maxRetryTime
77-
get(): kotlin.time.Duration =
78-
kotlin.time.Duration.parseSimpleString(getString(MAX_TRANSACTION_RETRY_TIMEOUT))
77+
get(): Duration = Duration.parseSimpleString(getString(MAX_TRANSACTION_RETRY_TIMEOUT))
7978

8079
internal val maxConnectionPoolSize
8180
get(): Int = getInt(POOL_MAX_CONNECTION_POOL_SIZE)
8281

8382
internal val connectionAcquisitionTimeout
84-
get(): kotlin.time.Duration =
85-
kotlin.time.Duration.parseSimpleString(getString(POOL_CONNECTION_ACQUISITION_TIMEOUT))
83+
get(): Duration = Duration.parseSimpleString(getString(POOL_CONNECTION_ACQUISITION_TIMEOUT))
8684

8785
internal val idleTimeBeforeTest
88-
get(): kotlin.time.Duration =
86+
get(): Duration =
8987
getString(POOL_IDLE_TIME_BEFORE_TEST).orEmpty().run {
9088
if (this.isEmpty()) {
9189
(-1).milliseconds
9290
} else {
93-
kotlin.time.Duration.parseSimpleString(this)
91+
Duration.parseSimpleString(this)
9492
}
9593
}
9694

9795
internal val maxConnectionLifetime
98-
get(): kotlin.time.Duration =
99-
kotlin.time.Duration.parseSimpleString(getString(POOL_MAX_CONNECTION_LIFETIME))
96+
get(): Duration = Duration.parseSimpleString(getString(POOL_MAX_CONNECTION_LIFETIME))
10097

10198
internal val encrypted
10299
get(): Boolean = getString(SECURITY_ENCRYPTED).toBoolean()
@@ -239,6 +236,12 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
239236
}
240237
}
241238

239+
val connectorName
240+
get(): String = originals()[CONNECTOR_NAME].toString()
241+
242+
val taskId
243+
get(): String = originals()[TASK_ID].toString()
244+
242245
companion object {
243246
val DEFAULT_MAX_RETRY_DURATION = 30.seconds
244247

@@ -271,6 +274,10 @@ open class Neo4jConfiguration(configDef: ConfigDef, originals: Map<*, *>, val ty
271274
const val SECURITY_TRUST_STRATEGY = "neo4j.security.trust-strategy"
272275
const val SECURITY_CERT_FILES = "neo4j.security.cert-files"
273276

277+
// internal properties
278+
const val CONNECTOR_NAME = "name"
279+
const val TASK_ID = "neo4j.task.id"
280+
274281
/** Perform validation on dependent configuration items */
275282
fun validate(config: org.apache.kafka.common.config.Config) {
276283
// authentication configuration
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.metrics
18+
19+
import java.util.concurrent.atomic.AtomicLong
20+
import org.neo4j.cdc.client.model.ChangeEvent
21+
import org.neo4j.connectors.kafka.configuration.ConnectorType
22+
23+
class CdcMetricsData(
24+
metrics: Metrics,
25+
connectorType: ConnectorType,
26+
tags: LinkedHashMap<String, String> = linkedMapOf(),
27+
) {
28+
29+
private var lastTxCommitTs: AtomicLong = AtomicLong(0L)
30+
private var lastTxStartTs: AtomicLong = AtomicLong(0L)
31+
private var lastTxId: AtomicLong = AtomicLong(0L)
32+
33+
init {
34+
metrics.addGauge(
35+
"last_cdc_tx_commit_timestamp",
36+
"The transaction commit timestamp of the last ${connectorType.descriptionActionVerb()} CDC message",
37+
tags,
38+
) {
39+
lastTxCommitTs.get()
40+
}
41+
metrics.addGauge(
42+
"last_cdc_tx_start_timestamp",
43+
"The transaction start timestamp of the last ${connectorType.descriptionActionVerb()} CDC message",
44+
tags,
45+
) {
46+
lastTxStartTs.get()
47+
}
48+
metrics.addGauge(
49+
"last_cdc_tx_id",
50+
"The transaction id of the last ${connectorType.descriptionActionVerb()} CDC message",
51+
tags,
52+
) {
53+
lastTxId.get()
54+
}
55+
}
56+
57+
fun update(event: ChangeEvent) {
58+
event.metadata?.let {
59+
lastTxCommitTs.set(it.txCommitTime.toEpochSecond())
60+
lastTxStartTs.set(it.txStartTime.toEpochSecond())
61+
}
62+
lastTxId.set(event.txId)
63+
}
64+
65+
companion object {
66+
private fun ConnectorType.descriptionActionVerb(): String =
67+
when (this) {
68+
ConnectorType.SOURCE -> "polled"
69+
ConnectorType.SINK -> "pushed"
70+
}
71+
}
72+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.metrics
18+
19+
import java.io.Closeable
20+
import java.util.concurrent.atomic.AtomicLong
21+
import kotlin.time.Duration
22+
import kotlinx.coroutines.CoroutineDispatcher
23+
import kotlinx.coroutines.CoroutineScope
24+
import kotlinx.coroutines.Dispatchers
25+
import kotlinx.coroutines.Job
26+
import kotlinx.coroutines.cancel
27+
import kotlinx.coroutines.delay
28+
import kotlinx.coroutines.isActive
29+
import kotlinx.coroutines.launch
30+
import org.neo4j.driver.Driver
31+
import org.neo4j.driver.TransactionConfig
32+
import org.slf4j.Logger
33+
import org.slf4j.LoggerFactory
34+
35+
class DbTransactionMetricsData(
36+
metrics: Metrics,
37+
tags: LinkedHashMap<String, String> = linkedMapOf(),
38+
refreshInterval: Duration,
39+
neo4jDriver: Driver,
40+
transactionConfig: TransactionConfig,
41+
databaseName: String,
42+
dispatcher: CoroutineDispatcher = Dispatchers.Default,
43+
) : Closeable {
44+
45+
private val lastTransactionId = AtomicLong(0)
46+
private val scope = CoroutineScope(dispatcher + Job())
47+
48+
init {
49+
metrics.addGauge("last_db_tx_id", "The last committed transaction id in the database", tags) {
50+
lastTransactionId.get()
51+
}
52+
53+
scope.launch {
54+
while (isActive) {
55+
try {
56+
val explicitDatabaseName = databaseName.ifBlank { "neo4j" }
57+
val txId: Long =
58+
neo4jDriver.session().use { session ->
59+
session.writeTransaction(
60+
{ tx ->
61+
tx.run(
62+
"SHOW DATABASE ${"$"}dbName YIELD lastCommittedTxn RETURN lastCommittedTxn as txId",
63+
mapOf("dbName" to explicitDatabaseName),
64+
)
65+
.single()
66+
.get("txId")
67+
.asLong()
68+
},
69+
transactionConfig,
70+
)
71+
}
72+
lastTransactionId.set(txId)
73+
} catch (e: Throwable) {
74+
log.warn("Unexpected error occurred while fetching last committed transaction id", e)
75+
}
76+
77+
delay(refreshInterval)
78+
}
79+
}
80+
}
81+
82+
override fun close() {
83+
scope.cancel()
84+
}
85+
86+
companion object {
87+
private val log: Logger = LoggerFactory.getLogger(DbTransactionMetricsData::class.java)
88+
}
89+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.metrics
18+
19+
import java.lang.management.ManagementFactory
20+
import java.util.Hashtable
21+
import java.util.concurrent.ConcurrentHashMap
22+
import javax.management.Attribute
23+
import javax.management.AttributeList
24+
import javax.management.DynamicMBean
25+
import javax.management.MBeanAttributeInfo
26+
import javax.management.MBeanInfo
27+
import javax.management.ObjectName
28+
import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration
29+
30+
class JmxMetrics(config: Neo4jConfiguration) : Metrics, DynamicMBean {
31+
32+
private val connectorName: String = config.connectorName
33+
private val taskId: String = config.taskId
34+
private val objectName: ObjectName =
35+
ObjectName(
36+
"kafka.connect",
37+
Hashtable<String, String>().apply {
38+
put("type", "plugins")
39+
put("connector", connectorName)
40+
put("task", taskId)
41+
},
42+
)
43+
44+
private val gauges = ConcurrentHashMap<String, Gauge<*>>()
45+
private val mbs = ManagementFactory.getPlatformMBeanServer()
46+
47+
init {
48+
if (mbs.isRegistered(objectName)) {
49+
mbs.unregisterMBean(objectName)
50+
}
51+
mbs.registerMBean(this, objectName)
52+
}
53+
54+
override fun <T : Number> addGauge(
55+
name: String,
56+
description: String,
57+
tags: LinkedHashMap<String, String>,
58+
valueProvider: () -> T?,
59+
) {
60+
gauges[name] = Gauge(name, description, valueProvider)
61+
}
62+
63+
override fun getAttribute(attribute: String?): Any? {
64+
return gauges[attribute]?.valueProvider?.invoke()
65+
}
66+
67+
override fun setAttribute(attribute: Attribute?) {
68+
throw UnsupportedOperationException("Attributes are read-only")
69+
}
70+
71+
override fun getAttributes(attributes: Array<out String>?): AttributeList {
72+
val list = AttributeList()
73+
attributes?.forEach { name ->
74+
getAttribute(name)?.let { value -> list.add(Attribute(name, value)) }
75+
}
76+
return list
77+
}
78+
79+
override fun setAttributes(attributes: AttributeList?): AttributeList {
80+
throw UnsupportedOperationException("Attributes are read-only")
81+
}
82+
83+
override fun invoke(
84+
actionName: String?,
85+
params: Array<out Any>?,
86+
signature: Array<out String>?,
87+
): Any {
88+
throw UnsupportedOperationException("Operations are not supported")
89+
}
90+
91+
override fun getMBeanInfo(): MBeanInfo {
92+
val attrs =
93+
gauges.values.map { gauge ->
94+
MBeanAttributeInfo(gauge.name, "java.lang.Number", gauge.description, true, false, false)
95+
}
96+
97+
return MBeanInfo(
98+
this.javaClass.name,
99+
"Neo4j Kafka Connector JMX Metrics",
100+
attrs.toTypedArray(),
101+
null,
102+
null,
103+
null,
104+
)
105+
}
106+
107+
private class Gauge<T : Number>(
108+
val name: String,
109+
val description: String,
110+
val valueProvider: () -> T?,
111+
)
112+
113+
override fun close() {
114+
if (mbs.isRegistered(objectName)) {
115+
mbs.unregisterMBean(objectName)
116+
}
117+
}
118+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.metrics
18+
19+
import java.io.Closeable
20+
import org.neo4j.connectors.kafka.configuration.Neo4jConfiguration
21+
22+
class MetricsFactory {
23+
24+
fun createMetrics(config: Neo4jConfiguration): Metrics {
25+
return JmxMetrics(config)
26+
}
27+
}
28+
29+
interface Metrics : Closeable {
30+
fun <T : Number> addGauge(
31+
name: String,
32+
description: String,
33+
tags: LinkedHashMap<String, String>,
34+
valueProvider: () -> T?,
35+
)
36+
}

common/src/main/resources/neo4j-source-configuration.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,6 @@ neo4j.cdc.use-leader=Type: Boolean;\nDescription: Whether to use leader for chan
3131
neo4j.cdc.poll-duration=Type: Duration;\nDescription: Maximum amount of time Kafka Connect poll request will wait for a change to be received from the database (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`).
3232
neo4j.cdc.poll-interval=Type: Duration;\nDescription: The interval in which the database will be polled for changes (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`).
3333
neo4j.payload-mode=Type: Enum<COMPACT, EXTENDED, RAW_JSON_STRING>;\nDescription: Defines the structure of change messages generated. `COMPACT` produces messages which are simpler but with potential schema compatibility and type safety issues, while `EXTENDED` produces messages with extra type information which removes the limitations of `COMPACT` mode. For example, a property type change will lead to schema compatibility failures in `COMPACT` mode. `RAW_JSON_STRING` mode generates messages which are serialized as raw JSON strings. Default is `EXTENDED`.
34+
neo4j.cdc.metric.last-db-tx-id.enabled=Type: Boolean;\nDescription: Whether to enable the `last_db_tx_id` metric.
35+
neo4j.cdc.metric.last-db-tx-id.refresh-interval=Type: Duration;\nDescription: The refresh interval of the `last_db_tx_id` metric (valid units are: `ms`, `s`, `m`, `h` and `d`; default unit is `s`). The default value is 30 seconds.
36+

0 commit comments

Comments
 (0)