Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ abstract class Neo4jSinkErrorIT {
errorDlqTopic = DLQ_TOPIC,
errorDlqContextHeadersEnable = true)
@Test
fun `should report an error with all error headers when headers enabled`(
fun `should report an error with all error headers when headers are enabled`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer,

session: Session
Expand Down Expand Up @@ -162,7 +162,7 @@ abstract class Neo4jSinkErrorIT {
errorDlqTopic = DLQ_TOPIC,
errorDlqContextHeadersEnable = true)
@Test
fun `should report errors when cypher strategy with multiple events`(
fun `should report failed events with cypher strategy`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
session: Session,
Expand Down Expand Up @@ -244,7 +244,7 @@ abstract class Neo4jSinkErrorIT {
errorDlqTopic = DLQ_TOPIC,
errorDlqContextHeadersEnable = true)
@Test
fun `should report errors when node pattern strategy with multiple events`(
fun `should report failed events with node pattern strategy`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
session: Session,
Expand Down Expand Up @@ -297,9 +297,9 @@ abstract class Neo4jSinkErrorIT {
val errorHeaders = ErrorHeaders(it.raw.headers())
errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 3
errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe
"org.neo4j.driver.exceptions.ClientException"
"org.neo4j.connectors.kafka.exceptions.InvalidDataException"
errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe
"Unable to convert kotlin.Unit to Neo4j Value."
"Key 'id' could not be located in the message."

it.value shouldBe message4ToFail.value
}
Expand All @@ -317,7 +317,7 @@ abstract class Neo4jSinkErrorIT {
errorDlqTopic = DLQ_TOPIC,
errorDlqContextHeadersEnable = true)
@Test
fun `should report errors when relationship pattern strategy with multiple events`(
fun `should report failed events with relationship pattern strategy`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
session: Session,
Expand Down Expand Up @@ -418,9 +418,9 @@ abstract class Neo4jSinkErrorIT {
val errorHeaders = ErrorHeaders(it.raw.headers())
errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 4
errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe
"org.neo4j.driver.exceptions.ClientException"
"org.neo4j.connectors.kafka.exceptions.InvalidDataException"
errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe
"Unable to convert kotlin.Unit to Neo4j Value."
"Key 'itemId' could not be located in the message."

it.value shouldBe message5ToFail.value
}
Expand All @@ -430,7 +430,7 @@ abstract class Neo4jSinkErrorIT {
@Neo4jSink(
cud = [CudStrategy(TOPIC)], errorDlqTopic = DLQ_TOPIC, errorDlqContextHeadersEnable = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's just me but I have a hard time parsing errorDlqContextHeadersEnable 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions for renaming? Yes, I know it's hard to grab at first glance but I just followed same pattern with actual configuration name errors.deadletterqueue.context.headers.enable 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about enableErrorHeaders ?

@Test
fun `should report errors when cud strategy with multiple events`(
fun `should report failed events with cud strategy`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
session: Session,
Expand Down Expand Up @@ -548,7 +548,7 @@ abstract class Neo4jSinkErrorIT {
errorDlqTopic = DLQ_TOPIC,
errorDlqContextHeadersEnable = true)
@Test
fun `should report errors when cdc schema strategy with multiple events`(
fun `should report failed events with cdc schema strategy`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
@TopicConsumer(DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
session: Session
Expand Down Expand Up @@ -661,7 +661,7 @@ abstract class Neo4jSinkErrorIT {
errorDlqTopic = DLQ_TOPIC,
errorDlqContextHeadersEnable = true)
@Test
fun `should report errors when cdc source id strategy with multiple events`(
fun `should report failed events with cdc source id strategy`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
@TopicConsumer(DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
session: Session
Expand Down Expand Up @@ -782,7 +782,7 @@ abstract class Neo4jSinkErrorIT {
errorTolerance = "none",
errorDlqTopic = DLQ_TOPIC)
@Test
fun `should report an error when tolerance none with multiple events`(
fun `should stop the process and only report first failed event when error tolerance is none`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
session: Session,
Expand All @@ -798,7 +798,7 @@ abstract class Neo4jSinkErrorIT {
val message3 =
KafkaMessage(
valueSchema = Schema.STRING_SCHEMA,
value = """{"id": 3, "name": "Mary", "surname": "Doe"}""")
value = """{"id": 3, name: "Mary", "surname": "Doe"}""")

producer.publish(message1, message2ToFail, message3)

Expand Down Expand Up @@ -829,7 +829,7 @@ abstract class Neo4jSinkErrorIT {
errorDlqTopic = DLQ_TOPIC,
errorDlqContextHeadersEnable = true)
@Test
fun `should be able to report from different topics with headers`(
fun `should report failed events from different topics`(
@TopicProducer(TOPIC_1) producer1: ConvertingKafkaProducer,
@TopicProducer(TOPIC_2) producer2: ConvertingKafkaProducer,
@TopicProducer(TOPIC_3) producer3: ConvertingKafkaProducer,
Expand Down Expand Up @@ -885,9 +885,9 @@ abstract class Neo4jSinkErrorIT {
val errorHeaders = ErrorHeaders(it.raw.headers())
errorHeaders.getValue(ErrorHeaders.TOPIC) shouldBe producer3.topic
errorHeaders.getValue(ErrorHeaders.EXCEPTION_CLASS_NAME) shouldBe
"org.neo4j.driver.exceptions.ClientException"
"org.neo4j.connectors.kafka.exceptions.InvalidDataException"
errorHeaders.getValue(ErrorHeaders.EXCEPTION_MESSAGE) shouldBe
"Unable to convert kotlin.Unit to Neo4j Value."
"Key 'id' could not be located in the message."

it.value shouldBe nodePatternMessageToFail.value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.neo4j.connectors.kafka.sink.strategy

import org.neo4j.cdc.client.model.NodeEvent
import org.neo4j.cdc.client.model.RelationshipEvent
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
import org.neo4j.connectors.kafka.sink.SinkStrategy
import org.neo4j.cypherdsl.core.Cypher
import org.neo4j.cypherdsl.core.Node
Expand All @@ -30,6 +31,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
override fun strategy() = SinkStrategy.CDC_SCHEMA

override fun transformCreate(event: NodeEvent): Query {
if (event.after == null) {
throw InvalidDataException("create operation requires 'after' field in the event object")
}

val node = buildNode(event.keys, "n")
val stmt =
Cypher.merge(node)
Expand All @@ -49,10 +54,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH

override fun transformUpdate(event: NodeEvent): Query {
if (event.before == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also have a similar validation for create and delete events, maybe in a follow-up PR. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think we should, since they can also fail if before or after is null

throw IllegalArgumentException("update operation requires 'before' field in the event object")
throw InvalidDataException("update operation requires 'before' field in the event object")
}
if (event.after == null) {
throw IllegalArgumentException("update operation requires 'after' field in the event object")
throw InvalidDataException("update operation requires 'after' field in the event object")
}

val node = buildNode(event.keys, "n")
Expand Down Expand Up @@ -88,6 +93,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
}

override fun transformCreate(event: RelationshipEvent): Query {
if (event.after == null) {
throw InvalidDataException("create operation requires 'after' field in the event object")
}

val (start, end, rel) = buildRelationship(event, "r")
val stmt =
Cypher.merge(start)
Expand All @@ -101,10 +110,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH

override fun transformUpdate(event: RelationshipEvent): Query {
if (event.before == null) {
throw IllegalArgumentException("update operation requires 'before' field in the event object")
throw InvalidDataException("update operation requires 'before' field in the event object")
}
if (event.after == null) {
throw IllegalArgumentException("update operation requires 'after' field in the event object")
throw InvalidDataException("update operation requires 'after' field in the event object")
}

val (start, end, rel) = buildRelationship(event, "r")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.neo4j.connectors.kafka.sink.strategy

import org.neo4j.cdc.client.model.NodeEvent
import org.neo4j.cdc.client.model.RelationshipEvent
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
import org.neo4j.connectors.kafka.sink.SinkStrategy
import org.neo4j.cypherdsl.core.Cypher
import org.neo4j.cypherdsl.core.Node
Expand All @@ -35,6 +36,10 @@ class CdcSourceIdHandler(
override fun strategy() = SinkStrategy.CDC_SOURCE_ID

override fun transformCreate(event: NodeEvent): Query {
if (event.after == null) {
throw InvalidDataException("create operation requires 'after' field in the event object")
}

val node = buildNode(event.elementId, "n")
val stmt =
Cypher.merge(node)
Expand All @@ -53,10 +58,10 @@ class CdcSourceIdHandler(

override fun transformUpdate(event: NodeEvent): Query {
if (event.before == null) {
throw IllegalArgumentException("update operation requires 'before' field in the event object")
throw InvalidDataException("update operation requires 'before' field in the event object")
}
if (event.after == null) {
throw IllegalArgumentException("update operation requires 'after' field in the event object")
throw InvalidDataException("update operation requires 'after' field in the event object")
}

val node = buildNode(event.elementId, "n")
Expand Down Expand Up @@ -92,6 +97,10 @@ class CdcSourceIdHandler(
}

override fun transformCreate(event: RelationshipEvent): Query {
if (event.after == null) {
throw InvalidDataException("create operation requires 'after' field in the event object")
}

val (start, end, rel) = buildRelationship(event, "r")
val stmt =
Cypher.merge(start)
Expand All @@ -105,10 +114,10 @@ class CdcSourceIdHandler(

override fun transformUpdate(event: RelationshipEvent): Query {
if (event.before == null) {
throw IllegalArgumentException("update operation requires 'before' field in the event object")
throw InvalidDataException("update operation requires 'before' field in the event object")
}
if (event.after == null) {
throw IllegalArgumentException("update operation requires 'after' field in the event object")
throw InvalidDataException("update operation requires 'after' field in the event object")
}

val (start, end, rel) = buildRelationship(event, "r")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.neo4j.connectors.kafka.sink.strategy
import java.time.Instant
import java.time.ZoneOffset
import org.apache.kafka.connect.errors.ConnectException
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
import org.neo4j.connectors.kafka.sink.SinkConfiguration
import org.neo4j.connectors.kafka.sink.SinkMessage
import org.neo4j.connectors.kafka.sink.SinkStrategyHandler
Expand Down Expand Up @@ -96,17 +97,31 @@ abstract class PatternHandler<T : Pattern>(
.mapValues { (_, mapping) ->
if (isExplicit(mapping.from)) {
val newKey = if (isTombstone) replaceValueWithKey(mapping.from) else mapping.from
usedTracker += newKey
return@mapValues flattened[newKey]
}

for (prefix in prefixes) {
val key = "$prefix.${mapping.from}"
if (flattened.containsKey(newKey)) {
usedTracker += newKey
return@mapValues flattened[newKey]
}
if (mapping.from.startsWith("$bindKeyAs.")) {
throw InvalidDataException(
"Key '${mapping.from.replace("$bindKeyAs.", "")}' could not be located in the keys.",
)
} else {
throw InvalidDataException(
"Key '${mapping.from.replace("$bindValueAs.", "")}' could not be located in the values.",
)
}
} else {
for (prefix in prefixes) {
val key = "$prefix.${mapping.from}"

if (flattened.containsKey(key)) {
usedTracker += key
return@mapValues flattened[key]
if (flattened.containsKey(key)) {
usedTracker += key
return@mapValues flattened[key]
}
}
throw InvalidDataException(
"Key '${mapping.from}' could not be located in the message.",
)
}
}

Expand Down
Loading