Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,32 @@ class Neo4jSinkTask : SinkTask() {
?.map { SinkMessage(it) }
?.groupBy { it.topic }
?.mapKeys { topicHandlers.getValue(it.key) }
?.forEach { (handler, messages) ->
val txGroups = handler.handle(messages)

txGroups.forEach { group ->
config.session().use { session ->
session.writeTransaction(
{ tx -> group.forEach { tx.run(it.query).consume() } }, config.txConfig())
}
}
?.forEach { (handler, messages) -> processMessages(handler, messages) }
}

private fun processMessages(handler: SinkStrategyHandler, messages: List<SinkMessage>) {
val handled = mutableSetOf<SinkMessage>()
try {
val txGroups = handler.handle(messages)

txGroups.forEach { group ->
config.session().use { session ->
session.writeTransaction(
{ tx -> group.forEach { tx.run(it.query).consume() } },
config.txConfig(),
)
}

handled.addAll(group.flatMap { it.messages })
}
} catch (e: Throwable) {
val unhandled = messages.minus(handled)

if (unhandled.size > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the reason for this particular threshold again? I forgot :|

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, shouldn't we limit the number of retries here? or even not retry at all, since the driver has retries built in writeTransaction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the reason for this particular threshold again? I forgot :|

This check is to find the specific sink record which makes connector throw the exception

also, shouldn't we limit the number of retries here? or even not retry at all, since the driver has retries built in writeTransaction

If I didn't misunderstand the question, yeah same with the first question actually, the retry here is to find problematic message and report only that message, not to get the message work by retying it

unhandled.forEach { m -> processMessages(handler, listOf(m)) }
} else {
unhandled.forEach { m -> context.errantRecordReporter()?.report(m.record, e)?.get() }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ enum class SinkStrategy(val description: String) {
RELATIONSHIP_PATTERN("relationship-pattern")
}

data class ChangeQuery(val txId: Long?, val seq: Int?, val query: Query)
data class ChangeQuery(
val txId: Long?,
val seq: Int?,
val messages: Iterable<SinkMessage>,
val query: Query
)

interface SinkStrategyHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@ import org.slf4j.LoggerFactory
abstract class CdcHandler : SinkStrategyHandler {
private val logger: Logger = LoggerFactory.getLogger(javaClass)

data class MessageToEvent(val message: SinkMessage, val changeEvent: ChangeEvent)

override fun handle(messages: Iterable<SinkMessage>): Iterable<Iterable<ChangeQuery>> {
return messages
.onEach { logger.trace("received message: {}", it) }
.map { it.toChangeEvent() }
.map { it.txId to it }
.onEach { logger.trace("converted message: {} to {}", it.first, it.second) }
.map { MessageToEvent(it, it.toChangeEvent()) }
.onEach { logger.trace("converted message: {} to {}", it.changeEvent.txId, it.changeEvent) }
.groupBy(
{ it.first },
{ it.changeEvent.txId },
{
ChangeQuery(
it.second.txId,
it.second.seq,
when (val event = it.second.event) {
it.changeEvent.txId,
it.changeEvent.seq,
listOf(it.message),
when (val event = it.changeEvent.event) {
is NodeEvent ->
when (event.operation) {
EntityOperation.CREATE -> transformCreate(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.neo4j.connectors.kafka.sink.strategy

import org.neo4j.cdc.client.model.NodeEvent
import org.neo4j.cdc.client.model.RelationshipEvent
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
import org.neo4j.connectors.kafka.sink.SinkStrategy
import org.neo4j.cypherdsl.core.Cypher
import org.neo4j.cypherdsl.core.Node
Expand All @@ -30,6 +31,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
override fun strategy() = SinkStrategy.CDC_SCHEMA

override fun transformCreate(event: NodeEvent): Query {
if (event.after == null) {
throw InvalidDataException("create operation requires 'after' field in the event object")
}

val node = buildNode(event.keys, "n")
val stmt =
Cypher.merge(node)
Expand All @@ -48,6 +53,13 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
}

override fun transformUpdate(event: NodeEvent): Query {
if (event.before == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also have a similar validation for create and delete events, maybe in a follow-up PR. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think we should, since they can also fail if before or after is null

throw InvalidDataException("update operation requires 'before' field in the event object")
}
if (event.after == null) {
throw InvalidDataException("update operation requires 'after' field in the event object")
}

val node = buildNode(event.keys, "n")
val stmt =
Cypher.merge(node)
Expand Down Expand Up @@ -81,6 +93,10 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
}

override fun transformCreate(event: RelationshipEvent): Query {
if (event.after == null) {
throw InvalidDataException("create operation requires 'after' field in the event object")
}

val (start, end, rel) = buildRelationship(event, "r")
val stmt =
Cypher.merge(start)
Expand All @@ -93,6 +109,13 @@ class CdcSchemaHandler(val topic: String, private val renderer: Renderer) : CdcH
}

override fun transformUpdate(event: RelationshipEvent): Query {
if (event.before == null) {
throw InvalidDataException("update operation requires 'before' field in the event object")
}
if (event.after == null) {
throw InvalidDataException("update operation requires 'after' field in the event object")
}

val (start, end, rel) = buildRelationship(event, "r")
val stmt =
Cypher.merge(start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.neo4j.connectors.kafka.sink.strategy

import org.neo4j.cdc.client.model.NodeEvent
import org.neo4j.cdc.client.model.RelationshipEvent
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
import org.neo4j.connectors.kafka.sink.SinkStrategy
import org.neo4j.cypherdsl.core.Cypher
import org.neo4j.cypherdsl.core.Node
Expand All @@ -35,6 +36,10 @@ class CdcSourceIdHandler(
override fun strategy() = SinkStrategy.CDC_SOURCE_ID

override fun transformCreate(event: NodeEvent): Query {
if (event.after == null) {
throw InvalidDataException("create operation requires 'after' field in the event object")
}

val node = buildNode(event.elementId, "n")
val stmt =
Cypher.merge(node)
Expand All @@ -52,6 +57,13 @@ class CdcSourceIdHandler(
}

override fun transformUpdate(event: NodeEvent): Query {
if (event.before == null) {
throw InvalidDataException("update operation requires 'before' field in the event object")
}
if (event.after == null) {
throw InvalidDataException("update operation requires 'after' field in the event object")
}

val node = buildNode(event.elementId, "n")
val stmt =
Cypher.merge(node)
Expand Down Expand Up @@ -85,6 +97,10 @@ class CdcSourceIdHandler(
}

override fun transformCreate(event: RelationshipEvent): Query {
if (event.after == null) {
throw InvalidDataException("create operation requires 'after' field in the event object")
}

val (start, end, rel) = buildRelationship(event, "r")
val stmt =
Cypher.merge(start)
Expand All @@ -97,6 +113,13 @@ class CdcSourceIdHandler(
}

override fun transformUpdate(event: RelationshipEvent): Query {
if (event.before == null) {
throw InvalidDataException("update operation requires 'before' field in the event object")
}
if (event.after == null) {
throw InvalidDataException("update operation requires 'after' field in the event object")
}

val (start, end, rel) = buildRelationship(event, "r")
val stmt =
Cypher.merge(start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ import org.neo4j.connectors.kafka.sink.SinkStrategy
import org.neo4j.connectors.kafka.sink.SinkStrategyHandler
import org.neo4j.connectors.kafka.sink.strategy.cud.Operation
import org.neo4j.cypherdsl.core.renderer.Renderer
import org.neo4j.driver.Query
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class CudHandler(val topic: String, private val renderer: Renderer, private val batchSize: Int) :
SinkStrategyHandler {

private val logger: Logger = LoggerFactory.getLogger(javaClass)

override fun strategy() = SinkStrategy.CUD

data class MessageToQuery(val message: SinkMessage, val query: Query)

@Suppress("UNCHECKED_CAST")
override fun handle(messages: Iterable<SinkMessage>): Iterable<Iterable<ChangeQuery>> {
return messages
Expand All @@ -44,10 +48,10 @@ class CudHandler(val topic: String, private val renderer: Renderer, private val
else -> throw ConnectException("Message value must be convertible to a Map.")
}
val cud = Operation.from(value)
cud.toQuery(renderer)
MessageToQuery(it, cud.toQuery(renderer))
}
.chunked(batchSize)
.map { it.map { q -> ChangeQuery(null, null, q) } }
.map { it.map { data -> ChangeQuery(null, null, listOf(data.message), data.query) } }
.onEach { logger.trace("mapped messages: '{}'", it) }
.toList()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class CypherHandler(

override fun strategy() = SinkStrategy.CYPHER

data class MessageToEventMap(val message: SinkMessage, val eventMap: Map<String, Any?>)

override fun handle(messages: Iterable<SinkMessage>): Iterable<Iterable<ChangeQuery>> {
return messages
.asSequence()
Expand All @@ -96,10 +98,17 @@ class CypherHandler(

logger.trace("message '{}' mapped to: '{}'", it, mapped)

mapped
MessageToEventMap(it, mapped)
}
.chunked(batchSize)
.map { listOf(ChangeQuery(null, null, Query(rewrittenQuery, mapOf("events" to it)))) }
.map {
listOf(
ChangeQuery(
null,
null,
it.map { data -> data.message },
Query(rewrittenQuery, mapOf("events" to it.map { data -> data.eventMap }))))
}
.onEach { logger.trace("mapped messages: '{}'", it) }
.toList()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class NodePatternHandler(

override fun strategy() = SinkStrategy.NODE_PATTERN

data class MessageToEventList(val message: SinkMessage, val eventList: List<Any>)

override fun handle(messages: Iterable<SinkMessage>): Iterable<Iterable<ChangeQuery>> {
return messages
.asSequence()
Expand All @@ -87,10 +89,17 @@ class NodePatternHandler(

logger.trace("message '{}' mapped to: '{}'", it, mapped)

mapped
MessageToEventList(it, mapped)
}
.chunked(batchSize)
.map { listOf(ChangeQuery(null, null, Query(query, mapOf(EVENTS to it)))) }
.map {
listOf(
ChangeQuery(
null,
null,
it.map { data -> data.message },
Query(query, mapOf(EVENTS to it.map { data -> data.eventList }))))
}
.onEach { logger.trace("mapped messages: '{}'", it) }
.toList()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.neo4j.connectors.kafka.sink.strategy
import java.time.Instant
import java.time.ZoneOffset
import org.apache.kafka.connect.errors.ConnectException
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
import org.neo4j.connectors.kafka.sink.SinkConfiguration
import org.neo4j.connectors.kafka.sink.SinkMessage
import org.neo4j.connectors.kafka.sink.SinkStrategyHandler
Expand Down Expand Up @@ -96,17 +97,31 @@ abstract class PatternHandler<T : Pattern>(
.mapValues { (_, mapping) ->
if (isExplicit(mapping.from)) {
val newKey = if (isTombstone) replaceValueWithKey(mapping.from) else mapping.from
usedTracker += newKey
return@mapValues flattened[newKey]
}

for (prefix in prefixes) {
val key = "$prefix.${mapping.from}"
if (flattened.containsKey(newKey)) {
usedTracker += newKey
return@mapValues flattened[newKey]
}
if (mapping.from.startsWith("$bindKeyAs.")) {
throw InvalidDataException(
"Key '${mapping.from.replace("$bindKeyAs.", "")}' could not be located in the keys.",
)
} else {
throw InvalidDataException(
"Key '${mapping.from.replace("$bindValueAs.", "")}' could not be located in the values.",
)
}
} else {
for (prefix in prefixes) {
val key = "$prefix.${mapping.from}"

if (flattened.containsKey(key)) {
usedTracker += key
return@mapValues flattened[key]
if (flattened.containsKey(key)) {
usedTracker += key
return@mapValues flattened[key]
}
}
throw InvalidDataException(
"Key '${mapping.from}' could not be located in the message.",
)
}
}

Expand Down

This file was deleted.

Loading