|
47 | 47 | import com.navercorp.pinpoint.plugin.kafka.interceptor.ConsumerRecordEntryPointInterceptor; |
48 | 48 | import com.navercorp.pinpoint.plugin.kafka.interceptor.ConsumerRecordsInterceptor; |
49 | 49 | import com.navercorp.pinpoint.plugin.kafka.interceptor.FetchResponseInterceptor; |
| 50 | +import com.navercorp.pinpoint.plugin.kafka.interceptor.ListenerConsumerInvokeErrorHandlerInterceptor; |
50 | 51 | import com.navercorp.pinpoint.plugin.kafka.interceptor.NetworkClientPollInterceptor; |
51 | 52 | import com.navercorp.pinpoint.plugin.kafka.interceptor.ProcessInterceptor; |
52 | 53 | import com.navercorp.pinpoint.plugin.kafka.interceptor.ProducerAddHeaderInterceptor; |
@@ -104,6 +105,11 @@ public void setup(ProfilerPluginSetupContext context) { |
104 | 105 | transformTemplate.transform("org.apache.kafka.common.requests.FetchResponse", FetchResponseTransform.class); |
105 | 106 |
|
106 | 107 | if (config.isSpringConsumerEnable()) { |
| 108 | + if(config.isKafkaMessageListenerContainerEnable()) { |
| 109 | + // KafkaMessageListenerContainer$ListenerConsumer |
| 110 | + transformTemplate.transform("org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer", ListenerConsumerTransform.class); |
| 111 | + } |
| 112 | + |
107 | 113 | transformTemplate.transform("org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter", AcknowledgingConsumerAwareMessageListenerTransform.class); |
108 | 114 | transformTemplate.transform("org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter", BatchMessagingMessageListenerAdapterTransform.class); |
109 | 115 |
|
@@ -348,6 +354,34 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, |
348 | 354 |
|
349 | 355 | } |
350 | 356 |
|
| 357 | + public static class ListenerConsumerTransform implements TransformCallback { |
| 358 | + |
| 359 | + @Override |
| 360 | + public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { |
| 361 | + final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer); |
| 362 | + |
| 363 | + final InstrumentMethod doInvokeRecordListenerMethod = target.getDeclaredMethod("doInvokeRecordListener", "org.apache.kafka.clients.consumer.ConsumerRecord", "java.util.Iterator"); |
| 364 | + if (doInvokeRecordListenerMethod != null) { |
| 365 | + doInvokeRecordListenerMethod.addScopedInterceptor(ConsumerRecordEntryPointInterceptor.class, va(0), KafkaConstants.SCOPE, ExecutionPolicy.BOUNDARY); |
| 366 | + } |
| 367 | + final InstrumentMethod invokeErrorHandlerMethod = target.getDeclaredMethod("invokeErrorHandler", "org.apache.kafka.clients.consumer.ConsumerRecord", "java.util.Iterator", "java.lang.RuntimeException"); |
| 368 | + if (invokeErrorHandlerMethod != null) { |
| 369 | + invokeErrorHandlerMethod.addInterceptor(ListenerConsumerInvokeErrorHandlerInterceptor.class); |
| 370 | + } |
| 371 | + final InstrumentMethod doInvokeBatchListenerMethod = target.getDeclaredMethod("doInvokeBatchListener", "org.apache.kafka.clients.consumer.ConsumerRecord", "java.util.List"); |
| 372 | + if (doInvokeBatchListenerMethod != null) { |
| 373 | + doInvokeBatchListenerMethod.addScopedInterceptor(ConsumerMultiRecordEntryPointInterceptor.class, va(1), KafkaConstants.SCOPE, ExecutionPolicy.BOUNDARY); |
| 374 | + } |
| 375 | + final InstrumentMethod invokeBatchErrorHandlerMethod = target.getDeclaredMethod("invokeBatchErrorHandler", "org.apache.kafka.clients.consumer.ConsumerRecord", "java.util.List", "java.lang.RuntimeException"); |
| 376 | + if (invokeBatchErrorHandlerMethod != null) { |
| 377 | + invokeBatchErrorHandlerMethod.addInterceptor(ListenerConsumerInvokeErrorHandlerInterceptor.class); |
| 378 | + } |
| 379 | + |
| 380 | + return target.toBytecode(); |
| 381 | + } |
| 382 | + } |
| 383 | + |
| 384 | + |
351 | 385 | public static class AcknowledgingConsumerAwareMessageListenerTransform implements TransformCallback { |
352 | 386 |
|
353 | 387 | @Override |
|
0 commit comments