Merged
Conversation
# Conflicts: # sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt # sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSchemaEventTransformer.kt # sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSourceIdEventTransformer.kt # sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/ApocCdcHandler.kt # sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/SinkConfigurationTest.kt # sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSchemaHandlerIT.kt # sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/CdcSourceIdHandlerIT.kt # sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/ApocCdcSchemaHandlerTest.kt # sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/ApocCdcSourceIdHandlerTest.kt # sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/batch/BatchedCdcSchemaHandlerTaskIT.kt # sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/batch/BatchedCdcSourceIdHandlerTaskIT.kt
ali-ince
requested changes
Feb 26, 2026
Contributor
ali-ince
left a comment
There was a problem hiding this comment.
Looks good, had a few change suggestions.
common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/DbTransactionMetricsData.kt
Outdated
Show resolved
Hide resolved
common/src/main/resources/neo4j-source-configuration.properties
Outdated
Show resolved
Hide resolved
common/src/main/resources/neo4j-source-configuration.properties
Outdated
Show resolved
Hide resolved
common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/DbTransactionMetricsData.kt
Outdated
Show resolved
Hide resolved
source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt
Outdated
Show resolved
Hide resolved
source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt
Outdated
Show resolved
Hide resolved
* update descriptions * update property keys * exception handling for last db tx is metric
ali-ince
approved these changes
Feb 27, 2026
Contributor
ali-ince
left a comment
There was a problem hiding this comment.
I only have a couple of very minor change suggestions.
common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/DbTransactionMetricsData.kt
Outdated
Show resolved
Hide resolved
common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/CdcMetricsData.kt
Outdated
Show resolved
Hide resolved
common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/CdcMetricsData.kt
Outdated
Show resolved
Hide resolved
common/src/main/kotlin/org/neo4j/connectors/kafka/metrics/CdcMetricsData.kt
Outdated
Show resolved
Hide resolved
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
We need to report custom connector metrics to track a lag between the last processed by the CDC task transaction id and the last transaction id in the database.
Solution
Metrics export destination
The New Plugin Metrics API was introduced only with Kafka 4.1.
The new API will rely on
metric.reportersto define a destination for metrics, which can be JMX or other monitoring solutions like DataDog, Prometheus, etc.However, Kafka Connect 4.0+ is not compatible with JVM 11. Thus, we can only include it into next major release.
For 3.x Kafka Connect, I've implemented a JMX metrics integration. It emulated how metrics are exported to JMX via Plugin Metrics to be forward compatible. As a limitation, it will only work in the use cases, when customers have remote access to the Connect server and won't work with cloud integrations and customer reporters.
Last transaction id committed to a database.
The solution suggested here is a cache with a fixed interval, that is updated via a
SHOW DATABASE ..query.As a side effect in the cases of fast source connect polling and a low number of transactions, the last db transaction id may fall behind CDC processed transaction id for a time < refreshPeriod.
Testing
So far tested localy:
