Skip to content

Commit d6df239

Browse files
Emrehzl94ali-incefbiville
authored
feat: error handling in sink connector (#142)
* refactor: rework cud handling * style: add license headers * refactor: use type casting instead of deserialization * feat: relate change queries to messages Co-authored-by: Emre Hizal <emre.hizal@neo4j.com> Co-authored-by: Florent Biville <florent.biville@neo4j.com> * test: set up test environment for error testing * test: start to add test cases for error handling * test: add error headers and more test cases * refactor: add data classes to pass down the event message * test: add more test cases for error handling * test: fix sink registration test * refactor: implement requested changes. * fix: remove unnecessary handler * test: fix failing tests * refactor: implement requested changes --------- Co-authored-by: Ali Ince <ali.ince@neo4j.com> Co-authored-by: Florent Biville <florent.biville@neo4j.com>
1 parent 730b465 commit d6df239

File tree

26 files changed

+2457
-697
lines changed

26 files changed

+2457
-697
lines changed

sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkErrorIT.kt

Lines changed: 920 additions & 0 deletions
Large diffs are not rendered by default.

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTask.kt

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,32 @@ class Neo4jSinkTask : SinkTask() {
4848
?.map { SinkMessage(it) }
4949
?.groupBy { it.topic }
5050
?.mapKeys { topicHandlers.getValue(it.key) }
51-
?.forEach { (handler, messages) ->
52-
val txGroups = handler.handle(messages)
53-
54-
txGroups.forEach { group ->
55-
config.session().use { session ->
56-
session.writeTransaction(
57-
{ tx -> group.forEach { tx.run(it.query).consume() } }, config.txConfig())
58-
}
59-
}
51+
?.forEach { (handler, messages) -> processMessages(handler, messages) }
52+
}
53+
54+
private fun processMessages(handler: SinkStrategyHandler, messages: List<SinkMessage>) {
55+
val handled = mutableSetOf<SinkMessage>()
56+
try {
57+
val txGroups = handler.handle(messages)
58+
59+
txGroups.forEach { group ->
60+
config.session().use { session ->
61+
session.writeTransaction(
62+
{ tx -> group.forEach { tx.run(it.query).consume() } },
63+
config.txConfig(),
64+
)
6065
}
66+
67+
handled.addAll(group.flatMap { it.messages })
68+
}
69+
} catch (e: Throwable) {
70+
val unhandled = messages.minus(handled)
71+
72+
if (unhandled.size > 1) {
73+
unhandled.forEach { m -> processMessages(handler, listOf(m)) }
74+
} else {
75+
unhandled.forEach { m -> context.errantRecordReporter()?.report(m.record, e)?.get() }
76+
}
77+
}
6178
}
6279
}

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,12 @@ enum class SinkStrategy(val description: String) {
102102
RELATIONSHIP_PATTERN("relationship-pattern")
103103
}
104104

105-
data class ChangeQuery(val txId: Long?, val seq: Int?, val query: Query)
105+
data class ChangeQuery(
106+
val txId: Long?,
107+
val seq: Int?,
108+
val messages: Iterable<SinkMessage>,
109+
val query: Query
110+
)
106111

107112
interface SinkStrategyHandler {
108113

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcHandler.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,21 @@ import org.slf4j.LoggerFactory
3535
abstract class CdcHandler : SinkStrategyHandler {
3636
private val logger: Logger = LoggerFactory.getLogger(javaClass)
3737

38+
data class MessageToEvent(val message: SinkMessage, val changeEvent: ChangeEvent)
39+
3840
override fun handle(messages: Iterable<SinkMessage>): Iterable<Iterable<ChangeQuery>> {
3941
return messages
4042
.onEach { logger.trace("received message: {}", it) }
41-
.map { it.toChangeEvent() }
42-
.map { it.txId to it }
43-
.onEach { logger.trace("converted message: {} to {}", it.first, it.second) }
43+
.map { MessageToEvent(it, it.toChangeEvent()) }
44+
.onEach { logger.trace("converted message: {} to {}", it.changeEvent.txId, it.changeEvent) }
4445
.groupBy(
45-
{ it.first },
46+
{ it.changeEvent.txId },
4647
{
4748
ChangeQuery(
48-
it.second.txId,
49-
it.second.seq,
50-
when (val event = it.second.event) {
49+
it.changeEvent.txId,
50+
it.changeEvent.seq,
51+
listOf(it.message),
52+
when (val event = it.changeEvent.event) {
5153
is NodeEvent ->
5254
when (event.operation) {
5355
EntityOperation.CREATE -> transformCreate(event)

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandler.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.neo4j.connectors.kafka.sink.strategy
1818

1919
import org.neo4j.cdc.client.model.NodeEvent
2020
import org.neo4j.cdc.client.model.RelationshipEvent
21+
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
2122
import org.neo4j.connectors.kafka.sink.SinkStrategy
2223
import org.neo4j.cypherdsl.core.Cypher
2324
import org.neo4j.cypherdsl.core.Node
@@ -30,6 +31,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
3031
override fun strategy() = SinkStrategy.CDC_SCHEMA
3132

3233
override fun transformCreate(event: NodeEvent): Query {
34+
if (event.after == null) {
35+
throw InvalidDataException("create operation requires 'after' field in the event object")
36+
}
37+
3338
val node = buildNode(event.keys, "n")
3439
val stmt =
3540
Cypher.merge(node)
@@ -48,6 +53,13 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
4853
}
4954

5055
override fun transformUpdate(event: NodeEvent): Query {
56+
if (event.before == null) {
57+
throw InvalidDataException("update operation requires 'before' field in the event object")
58+
}
59+
if (event.after == null) {
60+
throw InvalidDataException("update operation requires 'after' field in the event object")
61+
}
62+
5163
val node = buildNode(event.keys, "n")
5264
val stmt =
5365
Cypher.merge(node)
@@ -81,6 +93,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
8193
}
8294

8395
override fun transformCreate(event: RelationshipEvent): Query {
96+
if (event.after == null) {
97+
throw InvalidDataException("create operation requires 'after' field in the event object")
98+
}
99+
84100
val (start, end, rel) = buildRelationship(event, "r")
85101
val stmt =
86102
Cypher.merge(start)
@@ -93,6 +109,13 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
93109
}
94110

95111
override fun transformUpdate(event: RelationshipEvent): Query {
112+
if (event.before == null) {
113+
throw InvalidDataException("update operation requires 'before' field in the event object")
114+
}
115+
if (event.after == null) {
116+
throw InvalidDataException("update operation requires 'after' field in the event object")
117+
}
118+
96119
val (start, end, rel) = buildRelationship(event, "r")
97120
val stmt =
98121
Cypher.merge(start)

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandler.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.neo4j.connectors.kafka.sink.strategy
1818

1919
import org.neo4j.cdc.client.model.NodeEvent
2020
import org.neo4j.cdc.client.model.RelationshipEvent
21+
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
2122
import org.neo4j.connectors.kafka.sink.SinkStrategy
2223
import org.neo4j.cypherdsl.core.Cypher
2324
import org.neo4j.cypherdsl.core.Node
@@ -35,6 +36,10 @@ class CdcSourceIdHandler(
3536
override fun strategy() = SinkStrategy.CDC_SOURCE_ID
3637

3738
override fun transformCreate(event: NodeEvent): Query {
39+
if (event.after == null) {
40+
throw InvalidDataException("create operation requires 'after' field in the event object")
41+
}
42+
3843
val node = buildNode(event.elementId, "n")
3944
val stmt =
4045
Cypher.merge(node)
@@ -52,6 +57,13 @@ class CdcSourceIdHandler(
5257
}
5358

5459
override fun transformUpdate(event: NodeEvent): Query {
60+
if (event.before == null) {
61+
throw InvalidDataException("update operation requires 'before' field in the event object")
62+
}
63+
if (event.after == null) {
64+
throw InvalidDataException("update operation requires 'after' field in the event object")
65+
}
66+
5567
val node = buildNode(event.elementId, "n")
5668
val stmt =
5769
Cypher.merge(node)
@@ -85,6 +97,10 @@ class CdcSourceIdHandler(
8597
}
8698

8799
override fun transformCreate(event: RelationshipEvent): Query {
100+
if (event.after == null) {
101+
throw InvalidDataException("create operation requires 'after' field in the event object")
102+
}
103+
88104
val (start, end, rel) = buildRelationship(event, "r")
89105
val stmt =
90106
Cypher.merge(start)
@@ -97,6 +113,13 @@ class CdcSourceIdHandler(
97113
}
98114

99115
override fun transformUpdate(event: RelationshipEvent): Query {
116+
if (event.before == null) {
117+
throw InvalidDataException("update operation requires 'before' field in the event object")
118+
}
119+
if (event.after == null) {
120+
throw InvalidDataException("update operation requires 'after' field in the event object")
121+
}
122+
100123
val (start, end, rel) = buildRelationship(event, "r")
101124
val stmt =
102125
Cypher.merge(start)

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CudHandler.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,19 @@ import org.neo4j.connectors.kafka.sink.SinkStrategy
2323
import org.neo4j.connectors.kafka.sink.SinkStrategyHandler
2424
import org.neo4j.connectors.kafka.sink.strategy.cud.Operation
2525
import org.neo4j.cypherdsl.core.renderer.Renderer
26+
import org.neo4j.driver.Query
2627
import org.slf4j.Logger
2728
import org.slf4j.LoggerFactory
2829

2930
class CudHandler(val topic: String, private val renderer: Renderer, private val batchSize: Int) :
3031
SinkStrategyHandler {
32+
3133
private val logger: Logger = LoggerFactory.getLogger(javaClass)
3234

3335
override fun strategy() = SinkStrategy.CUD
3436

37+
data class MessageToQuery(val message: SinkMessage, val query: Query)
38+
3539
@Suppress("UNCHECKED_CAST")
3640
override fun handle(messages: Iterable<SinkMessage>): Iterable<Iterable<ChangeQuery>> {
3741
return messages
@@ -44,10 +48,10 @@ class CudHandler(val topic: String, private val renderer: Renderer, private val
4448
else -> throw ConnectException("Message value must be convertible to a Map.")
4549
}
4650
val cud = Operation.from(value)
47-
cud.toQuery(renderer)
51+
MessageToQuery(it, cud.toQuery(renderer))
4852
}
4953
.chunked(batchSize)
50-
.map { it.map { q -> ChangeQuery(null, null, q) } }
54+
.map { it.map { data -> ChangeQuery(null, null, listOf(data.message), data.query) } }
5155
.onEach { logger.trace("mapped messages: '{}'", it) }
5256
.toList()
5357
}

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/CypherHandler.kt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ class CypherHandler(
8181

8282
override fun strategy() = SinkStrategy.CYPHER
8383

84+
data class MessageToEventMap(val message: SinkMessage, val eventMap: Map<String, Any?>)
85+
8486
override fun handle(messages: Iterable<SinkMessage>): Iterable<Iterable<ChangeQuery>> {
8587
return messages
8688
.asSequence()
@@ -96,10 +98,17 @@ class CypherHandler(
9698

9799
logger.trace("message '{}' mapped to: '{}'", it, mapped)
98100

99-
mapped
101+
MessageToEventMap(it, mapped)
100102
}
101103
.chunked(batchSize)
102-
.map { listOf(ChangeQuery(null, null, Query(rewrittenQuery, mapOf("events" to it)))) }
104+
.map {
105+
listOf(
106+
ChangeQuery(
107+
null,
108+
null,
109+
it.map { data -> data.message },
110+
Query(rewrittenQuery, mapOf("events" to it.map { data -> data.eventMap }))))
111+
}
103112
.onEach { logger.trace("mapped messages: '{}'", it) }
104113
.toList()
105114
}

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandler.kt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ class NodePatternHandler(
6565

6666
override fun strategy() = SinkStrategy.NODE_PATTERN
6767

68+
data class MessageToEventList(val message: SinkMessage, val eventList: List<Any>)
69+
6870
override fun handle(messages: Iterable<SinkMessage>): Iterable<Iterable<ChangeQuery>> {
6971
return messages
7072
.asSequence()
@@ -87,10 +89,17 @@ class NodePatternHandler(
8789

8890
logger.trace("message '{}' mapped to: '{}'", it, mapped)
8991

90-
mapped
92+
MessageToEventList(it, mapped)
9193
}
9294
.chunked(batchSize)
93-
.map { listOf(ChangeQuery(null, null, Query(query, mapOf(EVENTS to it)))) }
95+
.map {
96+
listOf(
97+
ChangeQuery(
98+
null,
99+
null,
100+
it.map { data -> data.message },
101+
Query(query, mapOf(EVENTS to it.map { data -> data.eventList }))))
102+
}
94103
.onEach { logger.trace("mapped messages: '{}'", it) }
95104
.toList()
96105
}

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/PatternHandler.kt

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.neo4j.connectors.kafka.sink.strategy
1919
import java.time.Instant
2020
import java.time.ZoneOffset
2121
import org.apache.kafka.connect.errors.ConnectException
22+
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
2223
import org.neo4j.connectors.kafka.sink.SinkConfiguration
2324
import org.neo4j.connectors.kafka.sink.SinkMessage
2425
import org.neo4j.connectors.kafka.sink.SinkStrategyHandler
@@ -96,17 +97,31 @@ abstract class PatternHandler<T : Pattern>(
9697
.mapValues { (_, mapping) ->
9798
if (isExplicit(mapping.from)) {
9899
val newKey = if (isTombstone) replaceValueWithKey(mapping.from) else mapping.from
99-
usedTracker += newKey
100-
return@mapValues flattened[newKey]
101-
}
102-
103-
for (prefix in prefixes) {
104-
val key = "$prefix.${mapping.from}"
100+
if (flattened.containsKey(newKey)) {
101+
usedTracker += newKey
102+
return@mapValues flattened[newKey]
103+
}
104+
if (mapping.from.startsWith("$bindKeyAs.")) {
105+
throw InvalidDataException(
106+
"Key '${mapping.from.replace("$bindKeyAs.", "")}' could not be located in the keys.",
107+
)
108+
} else {
109+
throw InvalidDataException(
110+
"Key '${mapping.from.replace("$bindValueAs.", "")}' could not be located in the values.",
111+
)
112+
}
113+
} else {
114+
for (prefix in prefixes) {
115+
val key = "$prefix.${mapping.from}"
105116

106-
if (flattened.containsKey(key)) {
107-
usedTracker += key
108-
return@mapValues flattened[key]
117+
if (flattened.containsKey(key)) {
118+
usedTracker += key
119+
return@mapValues flattened[key]
120+
}
109121
}
122+
throw InvalidDataException(
123+
"Key '${mapping.from}' could not be located in the message.",
124+
)
110125
}
111126
}
112127

0 commit comments

Comments
 (0)