1
1
/*
2
- * Copyright 2002-2018 the original author or authors.
2
+ * Copyright 2002-2019 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
27
27
import java .util .concurrent .atomic .AtomicBoolean ;
28
28
import java .util .concurrent .atomic .AtomicLong ;
29
29
import java .util .concurrent .atomic .AtomicReference ;
30
+ import java .util .stream .Collectors ;
30
31
31
32
import javax .lang .model .SourceVersion ;
32
33
import javax .management .DynamicMBean ;
@@ -176,7 +177,6 @@ public IntegrationMBeanExporter() {
176
177
177
178
/**
178
179
* Static properties that will be added to all object names.
179
- *
180
180
* @param objectNameStaticProperties the objectNameStaticProperties to set
181
181
*/
182
182
public void setObjectNameStaticProperties (Map <String , String > objectNameStaticProperties ) {
@@ -186,7 +186,6 @@ public void setObjectNameStaticProperties(Map<String, String> objectNameStaticPr
186
186
/**
187
187
* The JMX domain to use for MBeans registered. Defaults to <code>spring.application</code> (which is useful in
188
188
* SpringSource HQ).
189
- *
190
189
* @param domain the domain name to set
191
190
*/
192
191
public void setDefaultDomain (String domain ) {
@@ -446,7 +445,6 @@ private MessageHandler handlerInAnonymousWrapper(final Object bean) {
446
445
/**
447
446
* Copy of private method in super class. Needed so we can avoid using the bean factory to extract the bean again,
448
447
* and risk it being a proxy (which it almost certainly is by now).
449
- *
450
448
* @param bean the bean instance to register
451
449
* @param beanKey the bean name or human readable version if auto-generated
452
450
* @return the JMX object name of the MBean that was registered
@@ -504,7 +502,6 @@ public void destroy() {
504
502
505
503
/**
506
504
* Shutdown active components.
507
- *
508
505
* @param howLong The time to wait in total for all activities to complete
509
506
* in milliseconds.
510
507
*/
@@ -557,8 +554,7 @@ private void doShutdown() {
557
554
*/
558
555
@ ManagedOperation
559
556
public void stopMessageSources () {
560
- for (Entry <String , MessageSourceMetrics > entry : this .allSourcesByName .entrySet ()) {
561
- MessageSourceMetrics sourceMetrics = entry .getValue ();
557
+ for (MessageSourceMetrics sourceMetrics : this .allSourcesByName .values ()) {
562
558
if (sourceMetrics instanceof Lifecycle ) {
563
559
if (logger .isInfoEnabled ()) {
564
560
logger .info ("Stopping message source " + sourceMetrics );
@@ -592,8 +588,7 @@ public void stopInboundMessageProducers() {
592
588
@ ManagedOperation
593
589
public void stopActiveChannels () {
594
590
// Stop any "active" channels (JMS etc).
595
- for (Entry <String , MessageChannelMetrics > entry : this .allChannelsByName .entrySet ()) {
596
- MessageChannelMetrics metrics = entry .getValue ();
591
+ for (MessageChannelMetrics metrics : this .allChannelsByName .values ()) {
597
592
MessageChannel channel = (MessageChannel ) metrics ;
598
593
if (channel instanceof Lifecycle ) {
599
594
if (logger .isInfoEnabled ()) {
@@ -608,8 +603,7 @@ protected final void orderlyShutdownCapableComponentsBefore() {
608
603
logger .debug ("Initiating stop OrderlyShutdownCapable components" );
609
604
Map <String , OrderlyShutdownCapable > components = this .applicationContext
610
605
.getBeansOfType (OrderlyShutdownCapable .class );
611
- for (Entry <String , OrderlyShutdownCapable > componentEntry : components .entrySet ()) {
612
- OrderlyShutdownCapable component = componentEntry .getValue ();
606
+ for (OrderlyShutdownCapable component : components .values ()) {
613
607
int n = component .beforeShutdown ();
614
608
if (logger .isInfoEnabled ()) {
615
609
logger .info ("Initiated stop for component " + component + "; it reported " + n + " active messages" );
@@ -622,8 +616,7 @@ protected final void orderlyShutdownCapableComponentsAfter() {
622
616
logger .debug ("Finalizing stop OrderlyShutdownCapable components" );
623
617
Map <String , OrderlyShutdownCapable > components = this .applicationContext
624
618
.getBeansOfType (OrderlyShutdownCapable .class );
625
- for (Entry <String , OrderlyShutdownCapable > componentEntry : components .entrySet ()) {
626
- OrderlyShutdownCapable component = componentEntry .getValue ();
619
+ for (OrderlyShutdownCapable component : components .values ()) {
627
620
int n = component .afterShutdown ();
628
621
if (logger .isInfoEnabled ()) {
629
622
logger .info ("Finalized stop for component " + component + "; it reported " + n + " active messages" );
@@ -668,13 +661,11 @@ public long getActiveHandlerCountLong() {
668
661
669
662
@ ManagedMetric (metricType = MetricType .GAUGE , displayName = "Queued Message Count" )
670
663
public int getQueuedMessageCount () {
671
- int count = 0 ;
672
- for (MessageChannelMetrics monitor : this .channels ) {
673
- if (monitor instanceof QueueChannel ) {
674
- count += ((QueueChannel ) monitor ).getQueueSize ();
675
- }
676
- }
677
- return count ;
664
+ return this .channels .stream ()
665
+ .filter (QueueChannel .class ::isInstance )
666
+ .map (QueueChannel .class ::cast )
667
+ .mapToInt (QueueChannel ::getQueueSize )
668
+ .sum ();
678
669
}
679
670
680
671
@ ManagedAttribute
@@ -779,7 +770,7 @@ private void registerHandlers() {
779
770
private void registerHandler (MessageHandlerMetrics handler ) {
780
771
MessageHandlerMetrics monitor = enhanceHandlerMonitor (handler );
781
772
String name = monitor .getManagedName ();
782
- if (matches (this .componentNamePatterns , name )) {
773
+ if (! this . objectNames . containsKey ( handler ) && matches (this .componentNamePatterns , name )) {
783
774
String beanKey = getHandlerBeanKey (monitor );
784
775
if (logger .isInfoEnabled ()) {
785
776
logger .info ("Registering MessageHandler " + name );
@@ -797,7 +788,7 @@ private void registerSource(MessageSourceMetrics source) {
797
788
MessageSourceMetrics monitor = enhanceSourceMonitor (source );
798
789
String name = monitor .getManagedName ();
799
790
this .allSourcesByName .put (name , monitor );
800
- if (matches (this .componentNamePatterns , name )) {
791
+ if (! this . objectNames . containsKey ( source ) && matches (this .componentNamePatterns , name )) {
801
792
String beanKey = getSourceBeanKey (monitor );
802
793
if (logger .isInfoEnabled ()) {
803
794
logger .info ("Registering MessageSource " + name );
@@ -917,17 +908,14 @@ private String getStaticNames() {
917
908
if (this .objectNameStaticProperties .isEmpty ()) {
918
909
return "" ;
919
910
}
920
- StringBuilder builder = new StringBuilder ();
921
911
922
- for ( Entry < Object , Object > entry : this .objectNameStaticProperties .entrySet ()) {
923
- builder . append ( "," + entry . getKey () + "=" + entry . getValue ());
924
- }
925
- return builder . toString ( );
912
+ return ',' + this .objectNameStaticProperties .entrySet ()
913
+ . stream ()
914
+ . map (( entry ) -> entry . getKey () + "=" + entry . getValue ())
915
+ . collect ( Collectors . joining ( "," ) );
926
916
}
927
917
928
918
private MessageHandlerMetrics enhanceHandlerMonitor (MessageHandlerMetrics monitor ) {
929
- MessageHandlerMetrics result = monitor ;
930
-
931
919
if (monitor .getManagedName () != null && monitor .getManagedType () != null ) {
932
920
return monitor ;
933
921
}
@@ -956,11 +944,21 @@ private MessageHandlerMetrics enhanceHandlerMonitor(MessageHandlerMetrics monito
956
944
endpoint = null ;
957
945
}
958
946
}
959
- if (name != null && name .startsWith ('_' + SI_PACKAGE )) {
960
- name = getInternalComponentName (name );
961
- source = "internal" ;
947
+ return buildMessageHandlerMetrics (monitor , name , endpointName , source , endpoint );
948
+ }
949
+
950
+ private MessageHandlerMetrics buildMessageHandlerMetrics (MessageHandlerMetrics monitor ,
951
+ String name , String endpointName , String source , IntegrationConsumer endpoint ) {
952
+
953
+ MessageHandlerMetrics result = monitor ;
954
+ String managedType = source ;
955
+ String managedName = name ;
956
+
957
+ if (managedName != null && managedName .startsWith ('_' + SI_PACKAGE )) {
958
+ managedName = getInternalComponentName (managedName );
959
+ managedType = "internal" ;
962
960
}
963
- if (name != null && name .startsWith (SI_PACKAGE )) {
961
+ if (managedName != null && name .startsWith (SI_PACKAGE )) {
964
962
MessageChannel inputChannel = endpoint .getInputChannel ();
965
963
if (inputChannel != null ) {
966
964
if (!this .anonymousHandlerCounters .containsKey (inputChannel )) {
@@ -976,34 +974,32 @@ private MessageHandlerMetrics enhanceHandlerMonitor(MessageHandlerMetrics monito
976
974
if (total > 1 ) {
977
975
suffix = "#" + total ;
978
976
}
979
- name = inputChannel + suffix ;
980
- source = "anonymous" ;
977
+ managedName = inputChannel + suffix ;
978
+ managedType = "anonymous" ;
981
979
}
982
980
}
983
981
984
982
if (endpoint instanceof Lifecycle ) {
985
983
result = wrapMessageHandlerInLifecycleMetrics (monitor , (Lifecycle ) endpoint );
986
984
}
987
985
988
- if (name == null ) {
986
+ if (managedName == null ) {
989
987
if (monitor instanceof NamedComponent ) {
990
- name = ((NamedComponent ) monitor ).getComponentName ();
988
+ managedName = ((NamedComponent ) monitor ).getComponentName ();
991
989
}
992
- if (name == null ) {
993
- name = monitor .toString ();
990
+ if (managedName == null ) {
991
+ managedName = monitor .toString ();
994
992
}
995
- source = "handler" ;
993
+ managedType = "handler" ;
996
994
}
997
995
998
996
if (endpointName != null ) {
999
997
this .endpointsByMonitor .put (monitor , endpointName );
1000
998
}
1001
999
1002
- result .setManagedType (source );
1003
- result .setManagedName (name );
1004
-
1000
+ result .setManagedType (managedType );
1001
+ result .setManagedName (managedName );
1005
1002
return result ;
1006
-
1007
1003
}
1008
1004
1009
1005
/**
@@ -1038,8 +1034,6 @@ private String getInternalComponentName(String name) {
1038
1034
}
1039
1035
1040
1036
private MessageSourceMetrics enhanceSourceMonitor (MessageSourceMetrics monitor ) {
1041
- MessageSourceMetrics result = monitor ;
1042
-
1043
1037
if (monitor .getManagedName () != null ) {
1044
1038
return monitor ;
1045
1039
}
@@ -1082,6 +1076,13 @@ private MessageSourceMetrics enhanceSourceMonitor(MessageSourceMetrics monitor)
1082
1076
name = getInternalComponentName (name );
1083
1077
source = "internal" ;
1084
1078
}
1079
+ return buildMessageSourceMetricsIfAny (monitor , name , endpointName , source , endpoint );
1080
+ }
1081
+
1082
+ private MessageSourceMetrics buildMessageSourceMetricsIfAny (MessageSourceMetrics monitor , String name ,
1083
+ String endpointName , String source , Object endpoint ) {
1084
+
1085
+ MessageSourceMetrics result = monitor ;
1085
1086
if (name != null && name .startsWith (SI_PACKAGE )) {
1086
1087
Object target = endpoint ;
1087
1088
if (endpoint instanceof Advised ) {
@@ -1122,21 +1123,20 @@ else if (target instanceof SourcePollingChannelAdapter) {
1122
1123
}
1123
1124
1124
1125
if (endpoint instanceof Lifecycle ) {
1125
- result = wrapMessageSourceInLifecycleMetrics (monitor , endpoint );
1126
+ result = wrapMessageSourceInLifecycleMetrics (result , endpoint );
1126
1127
}
1127
1128
1128
1129
if (name == null ) {
1129
- name = monitor .toString ();
1130
+ name = result .toString ();
1130
1131
source = "source" ;
1131
1132
}
1132
1133
1133
1134
if (endpointName != null ) {
1134
- this .endpointsByMonitor .put (monitor , endpointName );
1135
+ this .endpointsByMonitor .put (result , endpointName );
1135
1136
}
1136
1137
1137
- monitor .setManagedType (source );
1138
- monitor .setManagedName (name );
1139
-
1138
+ result .setManagedType (source );
1139
+ result .setManagedName (name );
1140
1140
return result ;
1141
1141
}
1142
1142
0 commit comments