@@ -93,21 +93,35 @@ class Neo4jSinkTask : SinkTask() {
9393 writeDuration.inWholeMilliseconds,
9494 )
9595 } catch (e: Throwable ) {
96- log.warn(" failed to process messages, trying to identify offending message" , e)
96+ log.warn(" failed to process messages with handler {}" , handler.strategy(), e)
97+ log.warn(
98+ " first and last messages in this batch were: {} and {}" ,
99+ messages.firstOrNull()?.record,
100+ messages.lastOrNull()?.record,
101+ )
102+ if (handled.isNotEmpty()) {
103+ val handledSorted = handled.sortedBy { it.record.kafkaOffset() }
97104
98- val unhandled = messages.minus(handled)
105+ log.warn(
106+ " successfully processed {} messages before failure. first and last messages processed were: {} and {}" ,
107+ handled.size,
108+ handledSorted.first().record.kafkaOffset(),
109+ handledSorted.last().record.kafkaOffset(),
110+ )
111+ }
99112
100- if (unhandled.size > 1 ) {
101- unhandled.forEach { m -> processMessages(handler, listOf (m)) }
102- } else {
103- unhandled.forEach { m ->
104- val reporter = context.errantRecordReporter()
105- if (reporter != null ) {
106- reporter.report(m.record, e).get()
107- } else {
108- throw e
109- }
113+ val reporter = context.errantRecordReporter()
114+ if (reporter != null ) {
115+ log.warn(" DLQ is enabled, will try to identify offending message" , e)
116+ val unhandled = messages.minus(handled)
117+
118+ if (unhandled.size > 1 ) {
119+ unhandled.forEach { m -> processMessages(handler, listOf (m)) }
120+ } else {
121+ unhandled.forEach { m -> reporter.report(m.record, e).get() }
110122 }
123+ } else {
124+ throw e
111125 }
112126 }
113127 }
0 commit comments