@@ -121,27 +121,15 @@ abstract class ApocCdcHandler(
121121 )
122122 appendLine(" WITH k, $EVENT WHERE $EVENT .offset > k.offset" )
123123 appendLine(" WITH k, $EVENT ORDER BY $EVENT .offset ASC" )
124- if (canIUse(Cypher .callSubqueryWithVariableScopeClause()).withNeo4j(neo4j))
125- appendLine(" CALL ($EVENT ) {" )
126- else appendLine(" CALL { WITH $EVENT " )
127- appendLine(
128- " CALL apoc.cypher.doIt($EVENT .stmt, $EVENT .params) YIELD value $termination "
129- )
130- appendLine(" }" )
124+ appendCallSubquery(termination)
131125 appendLine(" WITH k, max($EVENT .offset) AS newOffset SET k.offset = newOffset" )
132126 append(termination)
133127 }
134128 } else {
135129 buildString {
136130 appendLine(" UNWIND \$ events AS $EVENT " )
137131 appendLine(" WITH $EVENT ORDER BY $EVENT .offset ASC" )
138- if (canIUse(Cypher .callSubqueryWithVariableScopeClause()).withNeo4j(neo4j))
139- appendLine(" CALL ($EVENT ) {" )
140- else appendLine(" CALL { WITH $EVENT " )
141- appendLine(
142- " CALL apoc.cypher.doIt($EVENT .stmt, $EVENT .params) YIELD value $termination "
143- )
144- appendLine(" }" )
132+ appendCallSubquery(termination)
145133 append(termination)
146134 }
147135 }
@@ -157,6 +145,14 @@ abstract class ApocCdcHandler(
157145 )
158146 }
159147
148+ private fun StringBuilder.appendCallSubquery (termination : String ) {
149+ if (canIUse(Cypher .callSubqueryWithVariableScopeClause()).withNeo4j(neo4j))
150+ appendLine(" CALL ($EVENT ) {" )
151+ else appendLine(" CALL { WITH $EVENT " )
152+ appendLine(" CALL apoc.cypher.doIt($EVENT .stmt, $EVENT .params) YIELD value $termination " )
153+ appendLine(" }" )
154+ }
155+
160156 protected abstract fun transformCreate (event : NodeEvent ): CdcData
161157
162158 protected abstract fun transformUpdate (event : NodeEvent ): CdcData
0 commit comments