feat: error handling in sink connector#142
feat: error handling in sink connector#142Emrehzl94 merged 15 commits intoneo4j:mainfrom ali-ince:error-handling
Conversation
Co-authored-by: Emre Hizal <emre.hizal@neo4j.com> Co-authored-by: Florent Biville <florent.biville@neo4j.com>
ali-ince
left a comment
There was a problem hiding this comment.
Looks good. Left a couple of minor comments.
I think test names can be slightly rephrased in general.
| } | ||
|
|
||
| override fun transformUpdate(event: NodeEvent): Query { | ||
| if (event.before == null) { |
There was a problem hiding this comment.
should we also have a similar validation for create and delete events, maybe in a follow-up PR. wdyt?
There was a problem hiding this comment.
yes, I think we should, since they can also fail if before or after is null
| errorDlqTopic = DLQ_TOPIC, | ||
| errorDlqContextHeadersEnable = true) | ||
| @Test | ||
| fun `should report errors when cypher strategy with multiple events`( |
There was a problem hiding this comment.
I think the test name requires a bit of rephrasing?
There was a problem hiding this comment.
any suggestions? maybe should report failed events with cypher strategy ?
There was a problem hiding this comment.
yep, that sounds better :)
bad news is that, most of the test cases in this class requires similar renaming :)
There was a problem hiding this comment.
no worries :) not a big effort to change them :)
| errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe | ||
| "org.neo4j.driver.exceptions.ClientException" | ||
| errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe | ||
| "Unable to convert kotlin.Unit to Neo4j Value." |
There was a problem hiding this comment.
Shall we have a better error message displayed here, wdyt?
There was a problem hiding this comment.
so you mean we should prevent this error with some validation even before sending the query to the driver, if I'm not mistaken?
There was a problem hiding this comment.
No, what I mean is that we should handle this error way before we send the query to Neo4j - it will still fail but with a clear error message about what's wrong.
| errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe | ||
| "org.neo4j.driver.exceptions.ClientException" | ||
| errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe | ||
| "Unable to convert kotlin.Unit to Neo4j Value." |
| TopicVerifier.create<String, String>(consumer) | ||
| .assertMessageValue { it shouldBe message2ToFail.value } | ||
| .verifyWithin(Duration.ofSeconds(30)) | ||
| } |
There was a problem hiding this comment.
maybe for a future improvement but we could also verify that the connector instance is in FAILED state.
There was a problem hiding this comment.
should I create a card for this?
# Conflicts: # sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RedirectingHandler.kt
| @Test | ||
| fun `should split changes into transactional boundaries`() { | ||
| val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig())) | ||
| val handler = |
| @Test | ||
| fun `should report an error with all error headers when headers are enabled`( | ||
| @TopicProducer(TOPIC) producer: ConvertingKafkaProducer, | ||
| @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer, |
There was a problem hiding this comment.
| @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer, | |
| @TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer, |
| ) { | ||
| session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume() | ||
|
|
||
| val schema = |
There was a problem hiding this comment.
| val schema = | |
| val schemaWithMissingSurname = |
| "org.neo4j.driver.exceptions.ClientException" | ||
| errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe | ||
| """Cannot merge the following node because of null property value for 'surname': (:Person {surname: null})""" | ||
| errorHeaders.getValue(ErrorHeaders.EXCEPTION_STACKTRACE) shouldNotBe null |
There was a problem hiding this comment.
you mean why we only check it's not null?
There was a problem hiding this comment.
I'm wondering why we don't get a stack trace
There was a problem hiding this comment.
we're getting it, just it's gonna be too many lines in the test to put the all stack trace here and check they are equal, this is why I didn't put it and only check it's not null, maybe instead of all stack trace I can put shouldContain check
sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkErrorIT.kt
Show resolved
Hide resolved
| } | ||
|
|
||
| @Neo4jSink( | ||
| cud = [CudStrategy(TOPIC)], errorDlqTopic = DLQ_TOPIC, errorDlqContextHeadersEnable = true) |
There was a problem hiding this comment.
maybe it's just me but I have a hard time parsing errorDlqContextHeadersEnable 😅
There was a problem hiding this comment.
Any suggestions for renaming? Yes, I know it's hard to grab at first glance but I just followed same pattern with actual configuration name errors.deadletterqueue.context.headers.enable 😄
There was a problem hiding this comment.
based on my quick reading of https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/#92942d30-20e8-4c52-80f5-924b639e639b, something like includeErrorMessage or sth?
There was a problem hiding this comment.
what about enableErrorHeaders ?
| handler.handle(listOf(nodeChangeEventMessage)) | ||
| } shouldHaveMessage "create operation requires 'after' field in the event object" | ||
|
|
||
| val relationshipChangeEventMessage = |
There was a problem hiding this comment.
I'd personally move this to a separate test
| handler.handle(listOf(nodeChangeEventMessage)) | ||
| } shouldHaveMessage "update operation requires 'after' field in the event object" | ||
|
|
||
| val relationshipChangeEventMessage = |
| handler.handle(listOf(nodeChangeEventMessage)) | ||
| } shouldHaveMessage "create operation requires 'after' field in the event object" | ||
|
|
||
| val relationshipChangeEventMessage = |
| handler.handle(listOf(nodeChangeEventMessage)) | ||
| } shouldHaveMessage "update operation requires 'before' field in the event object" | ||
|
|
||
| val relationshipChangeEventMessage = |
| handler.handle(listOf(nodeChangeEventMessage)) | ||
| } shouldHaveMessage "update operation requires 'after' field in the event object" | ||
|
|
||
| val relationshipChangeEventMessage = |
ali-ince
left a comment
There was a problem hiding this comment.
Looks good, I think this is now ready to merged 👍
This pr includes error handling mechanism for sink connector and its integration tests. With this pr, now we are able to send error information to a configured DLQ for each sink message.