Skip to content

Commit 65300e8

Browse files
committed
refactor: implement requested changes
1 parent 0984c7b commit 65300e8

File tree

6 files changed

+82
-42
lines changed

6 files changed

+82
-42
lines changed

sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkErrorIT.kt

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -111,26 +111,26 @@ abstract class Neo4jSinkErrorIT {
111111
TOPIC,
112112
"MERGE (p:Person {id: event.id, name: event.name, surname: event.surname})")],
113113
errorDlqTopic = DLQ_TOPIC,
114-
errorDlqContextHeadersEnable = true)
114+
enableErrorHeaders = true)
115115
@Test
116116
fun `should report an error with all error headers when headers are enabled`(
117117
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
118-
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
118+
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer,
119119
session: Session
120120
) {
121121
session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume()
122122

123-
val schema =
123+
val schemaWithMissingSurname =
124124
SchemaBuilder.struct()
125125
.field("id", Schema.INT64_SCHEMA)
126126
.field("name", Schema.STRING_SCHEMA)
127127
.build()
128-
val struct = Struct(schema)
128+
val struct = Struct(schemaWithMissingSurname)
129129
struct.put("id", 1L)
130130
struct.put("name", "John")
131-
producer.publish(valueSchema = schema, value = struct)
131+
producer.publish(valueSchema = schemaWithMissingSurname, value = struct)
132132

133-
TopicVerifier.createForMap(consumer)
133+
TopicVerifier.createForMap(errorConsumer)
134134
.assertMessage {
135135
val errorHeaders = ErrorHeaders(it.raw.headers())
136136
errorHeaders.getValue(ErrorHeaders.TOPIC) shouldBe producer.topic
@@ -160,11 +160,11 @@ abstract class Neo4jSinkErrorIT {
160160
TOPIC,
161161
"MERGE (p:Person {id: event.id, name: event.name, surname: event.surname})")],
162162
errorDlqTopic = DLQ_TOPIC,
163-
errorDlqContextHeadersEnable = true)
163+
enableErrorHeaders = true)
164164
@Test
165165
fun `should report failed events with cypher strategy`(
166166
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
167-
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
167+
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer,
168168
session: Session,
169169
) = runTest {
170170
session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume()
@@ -212,7 +212,7 @@ abstract class Neo4jSinkErrorIT {
212212
(listOf("Person") to mapOf("id" to 5L, "name" to "Sue", "surname" to "Doe")))
213213
}
214214

215-
TopicVerifier.createForMap(consumer)
215+
TopicVerifier.createForMap(errorConsumer)
216216
.assertMessage {
217217
val errorHeaders = ErrorHeaders(it.raw.headers())
218218
errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 1
@@ -242,11 +242,11 @@ abstract class Neo4jSinkErrorIT {
242242
NodePatternStrategy(
243243
TOPIC, "(:Person{!id, name, surname})", mergeNodeProperties = false)],
244244
errorDlqTopic = DLQ_TOPIC,
245-
errorDlqContextHeadersEnable = true)
245+
enableErrorHeaders = true)
246246
@Test
247247
fun `should report failed events with node pattern strategy`(
248248
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
249-
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
249+
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer,
250250
session: Session,
251251
) = runTest {
252252
session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume()
@@ -282,7 +282,7 @@ abstract class Neo4jSinkErrorIT {
282282
(listOf("Person") to mapOf("id" to 5L, "name" to "Sue", "surname" to "Doe")))
283283
}
284284

285-
TopicVerifier.create<String, String>(consumer)
285+
TopicVerifier.create<String, String>(errorConsumer)
286286
.assertMessage {
287287
val errorHeaders = ErrorHeaders(it.raw.headers())
288288
errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 1
@@ -315,11 +315,11 @@ abstract class Neo4jSinkErrorIT {
315315
mergeNodeProperties = false,
316316
mergeRelationshipProperties = false)],
317317
errorDlqTopic = DLQ_TOPIC,
318-
errorDlqContextHeadersEnable = true)
318+
enableErrorHeaders = true)
319319
@Test
320320
fun `should report failed events with relationship pattern strategy`(
321321
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
322-
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
322+
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer,
323323
session: Session,
324324
) = runTest {
325325
session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume()
@@ -403,7 +403,7 @@ abstract class Neo4jSinkErrorIT {
403403
}
404404
}
405405

406-
TopicVerifier.create<String, String>(consumer)
406+
TopicVerifier.create<String, String>(errorConsumer)
407407
.assertMessage {
408408
val errorHeaders = ErrorHeaders(it.raw.headers())
409409
errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 2
@@ -427,12 +427,11 @@ abstract class Neo4jSinkErrorIT {
427427
.verifyWithin(Duration.ofSeconds(30))
428428
}
429429

430-
@Neo4jSink(
431-
cud = [CudStrategy(TOPIC)], errorDlqTopic = DLQ_TOPIC, errorDlqContextHeadersEnable = true)
430+
@Neo4jSink(cud = [CudStrategy(TOPIC)], errorDlqTopic = DLQ_TOPIC, enableErrorHeaders = true)
432431
@Test
433432
fun `should report failed events with cud strategy`(
434433
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
435-
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
434+
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer,
436435
session: Session,
437436
) = runTest {
438437
session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume()
@@ -519,7 +518,7 @@ abstract class Neo4jSinkErrorIT {
519518
(listOf("Person") to mapOf("id" to 5L, "name" to "Sue", "surname" to "Doe")))
520519
}
521520

522-
TopicVerifier.create<String, String>(consumer)
521+
TopicVerifier.create<String, String>(errorConsumer)
523522
.assertMessage {
524523
val errorHeaders = ErrorHeaders(it.raw.headers())
525524
errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 0
@@ -544,13 +543,11 @@ abstract class Neo4jSinkErrorIT {
544543
}
545544

546545
@Neo4jSink(
547-
cdcSchema = [CdcSchemaStrategy(TOPIC)],
548-
errorDlqTopic = DLQ_TOPIC,
549-
errorDlqContextHeadersEnable = true)
546+
cdcSchema = [CdcSchemaStrategy(TOPIC)], errorDlqTopic = DLQ_TOPIC, enableErrorHeaders = true)
550547
@Test
551548
fun `should report failed events with cdc schema strategy`(
552549
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
553-
@TopicConsumer(DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
550+
@TopicConsumer(DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer,
554551
session: Session
555552
) = runTest {
556553
session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume()
@@ -632,7 +629,7 @@ abstract class Neo4jSinkErrorIT {
632629
(listOf("Person") to mapOf("id" to 5L, "name" to "Sue", "surname" to "Doe")))
633630
}
634631

635-
TopicVerifier.create<ChangeEvent, ChangeEvent>(consumer)
632+
TopicVerifier.create<ChangeEvent, ChangeEvent>(errorConsumer)
636633
.assertMessage {
637634
val errorHeaders = ErrorHeaders(it.raw.headers())
638635
errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 0
@@ -659,11 +656,11 @@ abstract class Neo4jSinkErrorIT {
659656
@Neo4jSink(
660657
cdcSourceId = [CdcSourceIdStrategy(TOPIC, "SourceEvent", "sourceId")],
661658
errorDlqTopic = DLQ_TOPIC,
662-
errorDlqContextHeadersEnable = true)
659+
enableErrorHeaders = true)
663660
@Test
664661
fun `should report failed events with cdc source id strategy`(
665662
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
666-
@TopicConsumer(DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
663+
@TopicConsumer(DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer,
667664
session: Session
668665
) = runTest {
669666
session.run("CREATE CONSTRAINT FOR (n:Person) REQUIRE n.id IS KEY").consume()
@@ -752,7 +749,7 @@ abstract class Neo4jSinkErrorIT {
752749
"sourceId" to "person4", "id" to 4L, "name" to "Martin", "surname" to "Doe")))
753750
}
754751

755-
TopicVerifier.create<ChangeEvent, ChangeEvent>(consumer)
752+
TopicVerifier.create<ChangeEvent, ChangeEvent>(errorConsumer)
756753
.assertMessage {
757754
val errorHeaders = ErrorHeaders(it.raw.headers())
758755
errorHeaders.getValue(ErrorHeaders.OFFSET) shouldBe 1
@@ -784,7 +781,7 @@ abstract class Neo4jSinkErrorIT {
784781
@Test
785782
fun `should stop the process and only report first failed event when error tolerance is none`(
786783
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
787-
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") consumer: ConvertingKafkaConsumer,
784+
@TopicConsumer(topic = DLQ_TOPIC, offset = "earliest") errorConsumer: ConvertingKafkaConsumer,
788785
session: Session,
789786
) = runTest {
790787
val message1 =
@@ -810,7 +807,7 @@ abstract class Neo4jSinkErrorIT {
810807
}
811808
}
812809

813-
TopicVerifier.create<String, String>(consumer)
810+
TopicVerifier.create<String, String>(errorConsumer)
814811
.assertMessageValue { it shouldBe message2ToFail.value }
815812
.verifyWithin(Duration.ofSeconds(30))
816813
}
@@ -827,7 +824,7 @@ abstract class Neo4jSinkErrorIT {
827824
NodePatternStrategy(
828825
TOPIC_3, "(:Person{!id, name, surname})", mergeNodeProperties = false)],
829826
errorDlqTopic = DLQ_TOPIC,
830-
errorDlqContextHeadersEnable = true)
827+
enableErrorHeaders = true)
831828
@Test
832829
fun `should report failed events from different topics`(
833830
@TopicProducer(TOPIC_1) producer1: ConvertingKafkaProducer,

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSchemaHandlerTest.kt

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ class CdcSchemaHandlerTest {
577577
}
578578

579579
@Test
580-
fun `should fail on null 'after' field with create operation`() {
580+
fun `should fail on null 'after' field with node create operation`() {
581581
val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig()))
582582

583583
val nodeChangeEventMessage =
@@ -595,6 +595,11 @@ class CdcSchemaHandlerTest {
595595
assertThrows<InvalidDataException> {
596596
handler.handle(listOf(nodeChangeEventMessage))
597597
} shouldHaveMessage "create operation requires 'after' field in the event object"
598+
}
599+
600+
@Test
601+
fun `should fail on null 'after' field with relationship create operation`() {
602+
val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig()))
598603

599604
val relationshipChangeEventMessage =
600605
newChangeEventMessage(
@@ -622,7 +627,7 @@ class CdcSchemaHandlerTest {
622627
}
623628

624629
@Test
625-
fun `should fail on null 'before' field with update operation`() {
630+
fun `should fail on null 'before' field with node update operation`() {
626631
val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig()))
627632

628633
val nodeChangeEventMessage =
@@ -641,6 +646,11 @@ class CdcSchemaHandlerTest {
641646
assertThrows<InvalidDataException> {
642647
handler.handle(listOf(nodeChangeEventMessage))
643648
} shouldHaveMessage "update operation requires 'before' field in the event object"
649+
}
650+
651+
@Test
652+
fun `should fail on null 'before' field with relationship update operation`() {
653+
val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig()))
644654

645655
val relationshipChangeEventMessage =
646656
newChangeEventMessage(
@@ -668,7 +678,7 @@ class CdcSchemaHandlerTest {
668678
}
669679

670680
@Test
671-
fun `should fail on null 'after' field with update operation`() {
681+
fun `should fail on null 'after' field with node update operation`() {
672682
val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig()))
673683

674684
val nodeChangeEventMessage =
@@ -686,6 +696,11 @@ class CdcSchemaHandlerTest {
686696
assertThrows<InvalidDataException> {
687697
handler.handle(listOf(nodeChangeEventMessage))
688698
} shouldHaveMessage "update operation requires 'after' field in the event object"
699+
}
700+
701+
@Test
702+
fun `should fail on null 'after' field with relationship update operation`() {
703+
val handler = CdcSchemaHandler("my-topic", Renderer.getRenderer(Configuration.defaultConfig()))
689704

690705
val relationshipChangeEventMessage =
691706
newChangeEventMessage(

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/CdcSourceIdHandlerTest.kt

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ class CdcSourceIdHandlerTest {
443443
}
444444

445445
@Test
446-
fun `should fail on null 'after' field with create operation`() {
446+
fun `should fail on null 'after' field with node create operation`() {
447447
val handler =
448448
CdcSourceIdHandler(
449449
"my-topic",
@@ -466,6 +466,16 @@ class CdcSourceIdHandlerTest {
466466
assertThrows<InvalidDataException> {
467467
handler.handle(listOf(nodeChangeEventMessage))
468468
} shouldHaveMessage "create operation requires 'after' field in the event object"
469+
}
470+
471+
@Test
472+
fun `should fail on null 'after' field with relationship create operation`() {
473+
val handler =
474+
CdcSourceIdHandler(
475+
"my-topic",
476+
Renderer.getRenderer(Configuration.defaultConfig()),
477+
"SourceEvent",
478+
"sourceElementId")
469479

470480
val relationshipChangeEventMessage =
471481
newChangeEventMessage(
@@ -493,7 +503,7 @@ class CdcSourceIdHandlerTest {
493503
}
494504

495505
@Test
496-
fun `should fail on null 'before' field with update operation`() {
506+
fun `should fail on null 'before' field with node update operation`() {
497507
val handler =
498508
CdcSourceIdHandler(
499509
"my-topic",
@@ -517,6 +527,16 @@ class CdcSourceIdHandlerTest {
517527
assertThrows<InvalidDataException> {
518528
handler.handle(listOf(nodeChangeEventMessage))
519529
} shouldHaveMessage "update operation requires 'before' field in the event object"
530+
}
531+
532+
@Test
533+
fun `should fail on null 'before' field with relationship update operation`() {
534+
val handler =
535+
CdcSourceIdHandler(
536+
"my-topic",
537+
Renderer.getRenderer(Configuration.defaultConfig()),
538+
"SourceEvent",
539+
"sourceElementId")
520540

521541
val relationshipChangeEventMessage =
522542
newChangeEventMessage(
@@ -544,7 +564,7 @@ class CdcSourceIdHandlerTest {
544564
}
545565

546566
@Test
547-
fun `should fail on null 'after' field with update operation`() {
567+
fun `should fail on null 'after' field with node update operation`() {
548568
val handler =
549569
CdcSourceIdHandler(
550570
"my-topic",
@@ -567,6 +587,16 @@ class CdcSourceIdHandlerTest {
567587
assertThrows<InvalidDataException> {
568588
handler.handle(listOf(nodeChangeEventMessage))
569589
} shouldHaveMessage "update operation requires 'after' field in the event object"
590+
}
591+
592+
@Test
593+
fun `should fail on null 'after' field with relationship update operation`() {
594+
val handler =
595+
CdcSourceIdHandler(
596+
"my-topic",
597+
Renderer.getRenderer(Configuration.defaultConfig()),
598+
"SourceEvent",
599+
"sourceElementId")
570600

571601
val relationshipChangeEventMessage =
572602
newChangeEventMessage(

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSink.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ annotation class Neo4jSink(
4343
val cud: Array<CudStrategy> = [],
4444
val errorTolerance: String = "all",
4545
val errorDlqTopic: String = "",
46-
val errorDlqContextHeadersEnable: Boolean = false,
46+
val enableErrorHeaders: Boolean = false,
4747
)
4848

4949
enum class SchemaCompatibilityMode {

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkExtension.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ internal class Neo4jSinkExtension(
248248
strategies = strategies,
249249
errorTolerance = errorTolerance.resolve(sinkAnnotation),
250250
errorDlqTopic = topicRegistry.resolveTopic(errorDlqTopic.resolve(sinkAnnotation)),
251-
errorDlqContextHeadersEnable = sinkAnnotation.errorDlqContextHeadersEnable)
251+
enableErrorHeaders = sinkAnnotation.enableErrorHeaders)
252252
sink.register(kafkaConnectExternalUri.resolve(sinkAnnotation))
253253
topicRegistry.log()
254254
}

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistration.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ internal class Neo4jSinkRegistration(
3232
retryMaxDelay: Duration = 1000.milliseconds,
3333
errorTolerance: String = "all",
3434
errorDlqTopic: String = "",
35-
errorDlqContextHeadersEnable: Boolean = false,
35+
enableErrorHeaders: Boolean = false,
3636
enableErrorLog: Boolean = true,
3737
includeMessagesInErrorLog: Boolean = true,
3838
schemaControlRegistryUri: String,
@@ -68,9 +68,7 @@ internal class Neo4jSinkRegistration(
6868
"errors.deadletterqueue.topic.replication.factor",
6969
DLQ_TOPIC_REPLICATION_FACTOR)
7070
}
71-
put(
72-
"errors.deadletterqueue.context.headers.enable",
73-
errorDlqContextHeadersEnable)
71+
put("errors.deadletterqueue.context.headers.enable", enableErrorHeaders)
7472
put("errors.log.enable", enableErrorLog)
7573
put("errors.log.include.messages", includeMessagesInErrorLog)
7674
put("neo4j.uri", neo4jUri)

0 commit comments

Comments
 (0)