Skip to content

Commit 4eab2c2

Browse files
committed
fix: use an aggregate to make sure subquery has a result
1 parent 599b0ac commit 4eab2c2

File tree

3 files changed

+12
-9
lines changed

3 files changed

+12
-9
lines changed

sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/ApocCdcHandler.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ abstract class ApocCdcHandler(
105105
}
106106

107107
private fun batchedStatement(topic: String, partition: Int, events: List<MessageToEvent>): Query {
108-
val termination = if (neo4j.version >= Neo4jVersion(5, 19, 0)) "FINISH" else "RETURN 1"
108+
val termination =
109+
if (neo4j.version >= Neo4jVersion(5, 19, 0)) "FINISH" else "RETURN COUNT(1) AS total"
109110

110111
val query =
111112
if (eosOffsetLabel.isNotBlank()) {
@@ -118,15 +119,15 @@ abstract class ApocCdcHandler(
118119
)
119120
appendLine("WITH k, ${EVENT} WHERE ${EVENT}.offset > k.offset")
120121
appendLine("WITH k, ${EVENT} ORDER BY ${EVENT}.offset ASC")
121-
appendCallSubquery(termination)
122+
appendCallSubquery()
122123
appendLine("WITH k, max(${EVENT}.offset) AS newOffset SET k.offset = newOffset")
123124
append(termination)
124125
}
125126
} else {
126127
buildString {
127128
appendLine("UNWIND \$events AS ${EVENT}")
128129
appendLine("WITH ${EVENT} ORDER BY ${EVENT}.offset ASC")
129-
appendCallSubquery(termination)
130+
appendCallSubquery()
130131
append(termination)
131132
}
132133
}
@@ -153,11 +154,13 @@ abstract class ApocCdcHandler(
153154
)
154155
}
155156

156-
private fun StringBuilder.appendCallSubquery(termination: String) {
157+
private fun StringBuilder.appendCallSubquery() {
157158
if (canIUse(Cypher.callSubqueryWithVariableScopeClause()).withNeo4j(neo4j))
158159
appendLine("CALL (${EVENT}) {")
159160
else appendLine("CALL { WITH ${EVENT}")
160-
appendLine(" CALL apoc.cypher.doIt(${EVENT}.stmt, ${EVENT}.params) YIELD value $termination")
161+
appendLine(
162+
" CALL apoc.cypher.doIt(${EVENT}.stmt, ${EVENT}.params) YIELD value RETURN COUNT(1) AS total"
163+
)
161164
appendLine("}")
162165
}
163166

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/ApocCdcSchemaHandlerTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class ApocCdcSchemaHandlerWithEOSTest :
5454
|WITH k, e WHERE e.offset > k.offset
5555
|WITH k, e ORDER BY e.offset ASC
5656
|CALL (e) {
57-
| CALL apoc.cypher.doIt(e.stmt, e.params) YIELD value FINISH
57+
| CALL apoc.cypher.doIt(e.stmt, e.params) YIELD value RETURN COUNT(1) AS total
5858
|}
5959
|WITH k, max(e.offset) AS newOffset SET k.offset = newOffset
6060
|FINISH
@@ -69,7 +69,7 @@ class ApocCdcSchemaHandlerWithoutEOSTest :
6969
|UNWIND ${'$'}events AS e
7070
|WITH e ORDER BY e.offset ASC
7171
|CALL (e) {
72-
| CALL apoc.cypher.doIt(e.stmt, e.params) YIELD value FINISH
72+
| CALL apoc.cypher.doIt(e.stmt, e.params) YIELD value RETURN COUNT(1) AS total
7373
|}
7474
|FINISH
7575
"""

sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/cdc/apoc/ApocCdcSourceIdHandlerTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class ApocCdcSourceIdHandlerWithEOSTest :
4949
|WITH k, e WHERE e.offset > k.offset
5050
|WITH k, e ORDER BY e.offset ASC
5151
|CALL (e) {
52-
| CALL apoc.cypher.doIt(e.stmt, e.params) YIELD value FINISH
52+
| CALL apoc.cypher.doIt(e.stmt, e.params) YIELD value RETURN COUNT(1) AS total
5353
|}
5454
|WITH k, max(e.offset) AS newOffset SET k.offset = newOffset
5555
|FINISH
@@ -64,7 +64,7 @@ class ApocCdcSourceIdHandlerWithoutEOSTest :
6464
|UNWIND ${'$'}events AS e
6565
|WITH e ORDER BY e.offset ASC
6666
|CALL (e) {
67-
| CALL apoc.cypher.doIt(e.stmt, e.params) YIELD value FINISH
67+
| CALL apoc.cypher.doIt(e.stmt, e.params) YIELD value RETURN COUNT(1) AS total
6868
|}
6969
|FINISH
7070
"""

0 commit comments

Comments
 (0)