diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/IntegrationGraphServer.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/IntegrationGraphServer.java index fbc0ba705bd..d9b0657a8cd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/graph/IntegrationGraphServer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/IntegrationGraphServer.java @@ -35,6 +35,7 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.integration.core.MessageSource; import org.springframework.integration.endpoint.IntegrationConsumer; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.endpoint.PollingConsumer; @@ -50,6 +51,8 @@ import org.springframework.lang.Nullable; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.PollableChannel; +import org.springframework.util.ClassUtils; /** * Builds the runtime object model graph. @@ -62,7 +65,9 @@ */ public class IntegrationGraphServer implements ApplicationContextAware, ApplicationListener { - private static final float GRAPH_VERSION = 1.0f; + private static final float GRAPH_VERSION = 1.1f; + + private static MicrometerNodeEnhancer micrometerEnhancer; private final NodeFactory nodeFactory = new NodeFactory(); @@ -152,6 +157,10 @@ protected Map getBeansOfType(Class type) { } private synchronized Graph buildGraph() { + if (micrometerEnhancer == null && ClassUtils.isPresent("io.micrometer.core.instrument.MeterRegistry", + this.applicationContext.getClassLoader())) { + micrometerEnhancer = new MicrometerNodeEnhancer(this.applicationContext); + } String implementationVersion = IntegrationGraphServer.class.getPackage().getImplementationVersion(); if (implementationVersion == null) { implementationVersion = "unknown - is Spring Integration running from the distribution jar?"; @@ -347,91 +356,128 @@ private static final class NodeFactory { super(); } - private MessageChannelNode channelNode(String name, MessageChannel channel) { - return new MessageChannelNode(this.nodeId.incrementAndGet(), name, channel); + MessageChannelNode channelNode(String name, MessageChannel channel) { + MessageChannelNode node; + if (channel instanceof PollableChannel) { + node = new PollableChannelNode(this.nodeId.incrementAndGet(), name, channel); + } + else { + node = new MessageChannelNode(this.nodeId.incrementAndGet(), name, channel); + } + if (IntegrationGraphServer.micrometerEnhancer != null) { + node = IntegrationGraphServer.micrometerEnhancer.enhance(node); + } + return node; } - private MessageGatewayNode gatewayNode(String name, MessagingGatewaySupport gateway) { + MessageGatewayNode gatewayNode(String name, MessagingGatewaySupport gateway) { String errorChannel = channelToBeanName(gateway.getErrorChannel()); String requestChannel = channelToBeanName(gateway.getRequestChannel()); return new MessageGatewayNode(this.nodeId.incrementAndGet(), name, gateway, requestChannel, errorChannel); } @Nullable - private String channelToBeanName(MessageChannel messageChannel) { + String channelToBeanName(MessageChannel messageChannel) { return messageChannel instanceof NamedComponent ? ((NamedComponent) messageChannel).getBeanName() : Objects.toString(messageChannel, null); } - private MessageProducerNode producerNode(String name, MessageProducerSupport producer) { + MessageProducerNode producerNode(String name, MessageProducerSupport producer) { String errorChannel = channelToBeanName(producer.getErrorChannel()); String outputChannel = channelToBeanName(producer.getOutputChannel()); return new MessageProducerNode(this.nodeId.incrementAndGet(), name, producer, outputChannel, errorChannel); } - private MessageSourceNode sourceNode(String name, SourcePollingChannelAdapter adapter) { + MessageSourceNode sourceNode(String name, SourcePollingChannelAdapter adapter) { String errorChannel = channelToBeanName(adapter.getDefaultErrorChannel()); String outputChannel = channelToBeanName(adapter.getOutputChannel()); - return new MessageSourceNode(this.nodeId.incrementAndGet(), name, adapter.getMessageSource(), + String nameToUse = name; + MessageSource source = adapter.getMessageSource(); + if (source instanceof NamedComponent) { + nameToUse = ((NamedComponent) source).getComponentName(); + } + MessageSourceNode node = new MessageSourceNode(this.nodeId.incrementAndGet(), nameToUse, source, outputChannel, errorChannel); + if (IntegrationGraphServer.micrometerEnhancer != null) { + node = IntegrationGraphServer.micrometerEnhancer.enhance(node); + } + return node; } - private MessageHandlerNode handlerNode(String name, IntegrationConsumer consumer) { + MessageHandlerNode handlerNode(String nameArg, IntegrationConsumer consumer) { String outputChannelName = channelToBeanName(consumer.getOutputChannel()); MessageHandler handler = consumer.getHandler(); + MessageHandlerNode node; + String name = nameArg; + if (handler instanceof NamedComponent) { + name = ((NamedComponent) handler).getComponentName(); + } if (handler instanceof CompositeMessageHandler) { - return compositeHandler(name, consumer, (CompositeMessageHandler) handler, outputChannelName, null, + node = compositeHandler(name, consumer, (CompositeMessageHandler) handler, outputChannelName, null, false); } else if (handler instanceof DiscardingMessageHandler) { - return discardingHandler(name, consumer, (DiscardingMessageHandler) handler, outputChannelName, null, + node = discardingHandler(name, consumer, (DiscardingMessageHandler) handler, outputChannelName, null, false); } else if (handler instanceof MappingMessageRouterManagement) { - return routingHandler(name, consumer, handler, (MappingMessageRouterManagement) handler, + node = routingHandler(name, consumer, handler, (MappingMessageRouterManagement) handler, outputChannelName, null, false); } else if (handler instanceof RecipientListRouterManagement) { - return recipientListRoutingHandler(name, consumer, handler, (RecipientListRouterManagement) handler, + node = recipientListRoutingHandler(name, consumer, handler, (RecipientListRouterManagement) handler, outputChannelName, null, false); } else { String inputChannel = channelToBeanName(consumer.getInputChannel()); - return new MessageHandlerNode(this.nodeId.incrementAndGet(), name, handler, + node = new MessageHandlerNode(this.nodeId.incrementAndGet(), name, handler, inputChannel, outputChannelName); } + if (IntegrationGraphServer.micrometerEnhancer != null) { + node = IntegrationGraphServer.micrometerEnhancer.enhance(node); + } + return node; } - private MessageHandlerNode polledHandlerNode(String name, PollingConsumer consumer) { + MessageHandlerNode polledHandlerNode(String nameArg, PollingConsumer consumer) { String outputChannelName = channelToBeanName(consumer.getOutputChannel()); String errorChannel = channelToBeanName(consumer.getDefaultErrorChannel()); MessageHandler handler = consumer.getHandler(); + MessageHandlerNode node; + String name = nameArg; + if (handler instanceof NamedComponent) { + name = ((NamedComponent) handler).getComponentName(); + } if (handler instanceof CompositeMessageHandler) { - return compositeHandler(name, consumer, (CompositeMessageHandler) handler, outputChannelName, + node = compositeHandler(name, consumer, (CompositeMessageHandler) handler, outputChannelName, errorChannel, true); } else if (handler instanceof DiscardingMessageHandler) { - return discardingHandler(name, consumer, (DiscardingMessageHandler) handler, outputChannelName, + node = discardingHandler(name, consumer, (DiscardingMessageHandler) handler, outputChannelName, errorChannel, true); } else if (handler instanceof MappingMessageRouterManagement) { - return routingHandler(name, consumer, handler, (MappingMessageRouterManagement) handler, + node = routingHandler(name, consumer, handler, (MappingMessageRouterManagement) handler, outputChannelName, errorChannel, true); } else if (handler instanceof RecipientListRouterManagement) { - return recipientListRoutingHandler(name, consumer, handler, (RecipientListRouterManagement) handler, + node = recipientListRoutingHandler(name, consumer, handler, (RecipientListRouterManagement) handler, outputChannelName, errorChannel, true); } else { String inputChannel = channelToBeanName(consumer.getInputChannel()); - return new ErrorCapableMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler, + node = new ErrorCapableMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler, inputChannel, outputChannelName, errorChannel); } + if (IntegrationGraphServer.micrometerEnhancer != null) { + node = IntegrationGraphServer.micrometerEnhancer.enhance(node); + } + return node; } - private MessageHandlerNode compositeHandler(String name, IntegrationConsumer consumer, + MessageHandlerNode compositeHandler(String name, IntegrationConsumer consumer, CompositeMessageHandler handler, String output, String errors, boolean polled) { List innerHandlers = @@ -453,7 +499,7 @@ private MessageHandlerNode compositeHandler(String name, IntegrationConsumer con inputChannel, output, innerHandlers); } - private MessageHandlerNode discardingHandler(String name, IntegrationConsumer consumer, + MessageHandlerNode discardingHandler(String name, IntegrationConsumer consumer, DiscardingMessageHandler handler, String output, String errors, boolean polled) { String discards = channelToBeanName(handler.getDiscardChannel()); @@ -465,7 +511,7 @@ private MessageHandlerNode discardingHandler(String name, IntegrationConsumer co inputChannel, output, discards); } - private MessageHandlerNode routingHandler(String name, IntegrationConsumer consumer, MessageHandler handler, + MessageHandlerNode routingHandler(String name, IntegrationConsumer consumer, MessageHandler handler, MappingMessageRouterManagement router, String output, String errors, boolean polled) { Collection routes = @@ -481,7 +527,7 @@ private MessageHandlerNode routingHandler(String name, IntegrationConsumer consu inputChannel, output, routes); } - private MessageHandlerNode recipientListRoutingHandler(String name, IntegrationConsumer consumer, + MessageHandlerNode recipientListRoutingHandler(String name, IntegrationConsumer consumer, MessageHandler handler, RecipientListRouterManagement router, String output, String errors, boolean polled) { @@ -499,7 +545,7 @@ private MessageHandlerNode recipientListRoutingHandler(String name, IntegrationC inputChannel, output, routes); } - private void reset() { + void reset() { this.nodeId.set(0); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/IntegrationNode.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/IntegrationNode.java index c7e188560fb..5b4aaed08bd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/graph/IntegrationNode.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/IntegrationNode.java @@ -109,10 +109,16 @@ public void addProperties(@Nullable Map props) { public static class Stats { + private final String deprecated = "stats are deprecated in favor of sendTimers and receiveCounters"; + protected boolean isAvailable() { return false; } + public String getDeprecated() { + return this.deprecated; + } + } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageChannelNode.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageChannelNode.java index 00a3ef7f60c..e86146cf048 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageChannelNode.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageChannelNode.java @@ -16,6 +16,8 @@ package org.springframework.integration.graph; +import java.util.function.Supplier; + import org.springframework.messaging.MessageChannel; /** @@ -27,7 +29,9 @@ * */ @SuppressWarnings("deprecation") -public class MessageChannelNode extends IntegrationNode { +public class MessageChannelNode extends IntegrationNode implements SendTimersAware { + + private Supplier sendTimers; public MessageChannelNode(int nodeId, String name, MessageChannel channel) { super(nodeId, name, channel, @@ -36,6 +40,14 @@ public MessageChannelNode(int nodeId, String name, MessageChannel channel) { : new IntegrationNode.Stats()); } + public SendTimers getSendTimers() { + return this.sendTimers.get(); + } + + @Override + public void sendTimers(Supplier timers) { + this.sendTimers = timers; + } public static final class Stats extends IntegrationNode.Stats { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageHandlerNode.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageHandlerNode.java index 61bf961328f..e92f1baaebb 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageHandlerNode.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageHandlerNode.java @@ -16,6 +16,8 @@ package org.springframework.integration.graph; +import java.util.function.Supplier; + import org.springframework.messaging.MessageHandler; /** @@ -27,10 +29,12 @@ * */ @SuppressWarnings("deprecation") -public class MessageHandlerNode extends EndpointNode { +public class MessageHandlerNode extends EndpointNode implements SendTimersAware { private final String input; + private Supplier sendTimers; + public MessageHandlerNode(int nodeId, String name, MessageHandler handler, String input, String output) { super(nodeId, name, handler, output, handler instanceof org.springframework.integration.support.management.MessageHandlerMetrics @@ -43,6 +47,15 @@ public String getInput() { return this.input; } + public SendTimers getSendTimers() { + return this.sendTimers.get(); + } + + @Override + public void sendTimers(Supplier timers) { + this.sendTimers = timers; + } + public static final class Stats extends IntegrationNode.Stats { private final org.springframework.integration.support.management.MessageHandlerMetrics handler; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageSourceNode.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageSourceNode.java index 13931ffa7a8..7a98436a8c8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageSourceNode.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/MessageSourceNode.java @@ -16,6 +16,8 @@ package org.springframework.integration.graph; +import java.util.function.Supplier; + import org.springframework.integration.core.MessageSource; /** @@ -27,7 +29,9 @@ * */ @SuppressWarnings("deprecation") -public class MessageSourceNode extends ErrorCapableEndpointNode { +public class MessageSourceNode extends ErrorCapableEndpointNode implements ReceiveCountersAware { + + private Supplier receiveCounters; public MessageSourceNode(int nodeId, String name, MessageSource messageSource, String output, String errors) { super(nodeId, name, messageSource, output, errors, @@ -37,6 +41,14 @@ public MessageSourceNode(int nodeId, String name, MessageSource messageSource : new IntegrationNode.Stats()); } + public ReceiveCounters getReceiveCounters() { + return this.receiveCounters.get(); + } + + @Override + public void receiveCounters(Supplier counters) { + this.receiveCounters = counters; + } public static final class Stats extends IntegrationNode.Stats { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/MicrometerNodeEnhancer.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/MicrometerNodeEnhancer.java new file mode 100644 index 00000000000..9cdbf9b5aa9 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/MicrometerNodeEnhancer.java @@ -0,0 +1,148 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.graph; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.springframework.context.ApplicationContext; +import org.springframework.integration.support.management.IntegrationManagement; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; + +/** + * Add micrometer metrics to the node. + * + * @author Gary Russell + * @since 5.2 + * + */ +public class MicrometerNodeEnhancer { + + private static final TimerStats ZERO_TIMER_STATS = new TimerStats(0L, 0.0, 0.0); + + private final MeterRegistry registry; + + MicrometerNodeEnhancer(ApplicationContext applicationContext) { + Map registries = applicationContext.getBeansOfType(MeterRegistry.class, false, false); + if (registries.size() == 1) { + this.registry = registries.values().iterator().next(); + } + else { + this.registry = null; + } + } + + /** + * Add micrometer metrics to the node. + * @param the node type. + * @param node the node. + * @return the enhanced node. + */ + T enhance(T node) { + if (this.registry != null) { + if (node instanceof MessageChannelNode) { + enhanceWithTimers(node, "channel"); + } + else if (node instanceof MessageHandlerNode) { + enhanceWithTimers(node, "handler"); + } + if (node instanceof PollableChannelNode) { + enhanceWithCounts(node, "channel"); + } + else if (node instanceof MessageSourceNode) { + enhanceWithCounts(node, "source"); + } + } + return node; + } + + private void enhanceWithTimers(T node, String type) { + ((SendTimersAware) node).sendTimers(() -> retrieveTimers(node, type)); + } + + private SendTimers retrieveTimers(T node, String type) { + Timer successTimer = null; + try { + successTimer = this.registry.get(IntegrationManagement.SEND_TIMER_NAME) + .tag("type", type) + .tag("name", node.getName()) + .tag("result", "success") + .timer(); + } + catch (@SuppressWarnings("unused") + Exception e) { + // NOSONAR empty + } + Timer failureTimer = null; + try { + failureTimer = this.registry.get(IntegrationManagement.SEND_TIMER_NAME) + .tag("type", type) + .tag("name", node.getName()) + .tag("result", "failure") + .timer(); + } + catch (@SuppressWarnings("unused") + Exception e) { + // NOSONAR empty; + } + TimerStats successes = successTimer == null ? ZERO_TIMER_STATS + : new TimerStats(successTimer.count(), successTimer.mean(TimeUnit.MILLISECONDS), + successTimer.max(TimeUnit.MILLISECONDS)); + TimerStats failures = failureTimer == null ? ZERO_TIMER_STATS + : new TimerStats(failureTimer.count(), failureTimer.mean(TimeUnit.MILLISECONDS), + failureTimer.max(TimeUnit.MILLISECONDS)); + return new SendTimers(successes, failures); + } + + private void enhanceWithCounts(T node, String type) { + ((ReceiveCountersAware) node).receiveCounters(() -> retrieveCounters(node, type)); + } + + private ReceiveCounters retrieveCounters(T node, String type) { + Counter successes = null; + String name = node.getName(); + try { + successes = this.registry.get(IntegrationManagement.RECEIVE_COUNTER_NAME) + .tag("type", type) + .tag("name", name) + .tag("result", "success") + .counter(); + } + catch (@SuppressWarnings("unused") Exception e) { + // NOSONAR empty; + } + Counter failures = null; + try { + failures = this.registry.get(IntegrationManagement.RECEIVE_COUNTER_NAME) + .tag("type", type) + .tag("name", name) + .tag("result", "failure") + .counter(); + } + catch (@SuppressWarnings("unused") Exception e) { + // NOSONAR empty; + } + return new ReceiveCounters( + (long) (successes == null ? 0 : successes.count()), + (long) (failures == null ? 0 : failures.count())); + } + +} + diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/PollableChannelNode.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/PollableChannelNode.java new file mode 100644 index 00000000000..c2564329892 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/PollableChannelNode.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.graph; + +import java.util.function.Supplier; + +import org.springframework.messaging.MessageChannel; + +/** + * Represents a pollable channel. + * + * @author Gary Russell + * @since 5.2 + * + */ +public class PollableChannelNode extends MessageChannelNode implements ReceiveCountersAware { + + private Supplier receiveCounters; + + public PollableChannelNode(int nodeId, String name, MessageChannel channel) { + super(nodeId, name, channel); + } + + public ReceiveCounters getReceiveCounters() { + return this.receiveCounters.get(); + } + + @Override + public void receiveCounters(Supplier counters) { + this.receiveCounters = counters; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/ReceiveCounters.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/ReceiveCounters.java new file mode 100644 index 00000000000..9b20c1e0b1d --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/ReceiveCounters.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.graph; + +/** + * Counters for components that maintain receive counters. + * + * @author Gary Russell + * @since 5.2 + * + */ +public class ReceiveCounters { + + private final long successes; + + private final long failures; + + public ReceiveCounters(long successes, long failures) { + this.successes = successes; + this.failures = failures; + } + + public long getSuccesses() { + return this.successes; + } + + public long getFailures() { + return this.failures; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/ReceiveCountersAware.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/ReceiveCountersAware.java new file mode 100644 index 00000000000..ddc06e05463 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/ReceiveCountersAware.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.graph; + +import java.util.function.Supplier; + +/** + * @author Gary Russell + * @since 5.2 + * + */ +public interface ReceiveCountersAware { + + void receiveCounters(Supplier counters); + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/SendTimers.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/SendTimers.java new file mode 100644 index 00000000000..eadb4224f79 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/SendTimers.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.graph; + +/** + * Success and failure timer stats. + * + * @author Gary Russell + * @since 5.2 + * + */ +public class SendTimers { + + private final TimerStats successes; + + private final TimerStats failures; + + public SendTimers(TimerStats successes, TimerStats failures) { + this.successes = successes; + this.failures = failures; + } + + public TimerStats getSuccesses() { + return this.successes; + } + + public TimerStats getFailures() { + return this.failures; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/SendTimersAware.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/SendTimersAware.java new file mode 100644 index 00000000000..98702c283f6 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/SendTimersAware.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.graph; + +import java.util.function.Supplier; + +/** + * @author Gary Russell + * @since 5.2 + * + */ +public interface SendTimersAware { + + void sendTimers(Supplier timers); + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/graph/TimerStats.java b/spring-integration-core/src/main/java/org/springframework/integration/graph/TimerStats.java new file mode 100644 index 00000000000..78cfdc4ebc6 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/graph/TimerStats.java @@ -0,0 +1,52 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.graph; + +/** + * Statistics captured from a timer meter. + * + * @author Gary Russell + * @since 5.2 + * + */ +public class TimerStats { + + private final long count; + + private final double mean; + + private final double max; + + public TimerStats(long count, double mean, double max) { + this.count = count; + this.mean = mean; + this.max = max; + } + + public long getCount() { + return this.count; + } + + public double getMean() { + return this.mean; + } + + public double getMax() { + return this.max; + } + +} diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/IntegrationGraphServerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/IntegrationGraphServerTests.java index 14436f970a9..fa01bcafb96 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/IntegrationGraphServerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/IntegrationGraphServerTests.java @@ -34,6 +34,7 @@ import org.springframework.context.annotation.ImportResource; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.Filter; +import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.Router; @@ -44,9 +45,11 @@ import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.config.EnableIntegrationManagement; import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.context.IntegrationFlowContext; import org.springframework.integration.dsl.context.IntegrationFlowContext.IntegrationFlowRegistration; +import org.springframework.integration.endpoint.AbstractMessageSource; import org.springframework.integration.endpoint.EventDrivenConsumer; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.endpoint.PollingConsumer; @@ -64,12 +67,16 @@ import org.springframework.messaging.MessagingException; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.support.PeriodicTrigger; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import net.minidev.json.JSONArray; /** @@ -92,6 +99,9 @@ public class IntegrationGraphServerTests { @Autowired private IntegrationFlowContext flowContext; + @Autowired + private MessageSource testSource; + @SuppressWarnings("unchecked") @Test public void test() throws Exception { @@ -107,7 +117,7 @@ public void test() throws Exception { assertThat(map.size()).isEqualTo(3); List> nodes = (List>) map.get("nodes"); assertThat(nodes).isNotNull(); - assertThat(nodes.size()).isEqualTo(32); + assertThat(nodes.size()).isEqualTo(33); JSONArray jsonArray = JsonPathUtils.evaluate(baos.toByteArray(), "$..nodes[?(@.componentType == 'gateway')]"); @@ -125,12 +135,13 @@ public void test() throws Exception { List> links = (List>) map.get("links"); assertThat(links).isNotNull(); - assertThat(links.size()).isEqualTo(33); + assertThat(links.size()).isEqualTo(35); - toRouter.send(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build()); - toRouter.send(MessageBuilder.withPayload("foo").setHeader("foo", "baz").build()); - toRouter.send(MessageBuilder.withPayload("foo").setHeader("foo", "quxChannel").build()); - toRouter.send(MessageBuilder.withPayload("foo").setHeader("foo", "fizChannel").build()); + this.toRouter.send(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build()); + this.toRouter.send(MessageBuilder.withPayload("foo").setHeader("foo", "baz").build()); + this.toRouter.send(MessageBuilder.withPayload("foo").setHeader("foo", "quxChannel").build()); + this.toRouter.send(MessageBuilder.withPayload("foo").setHeader("foo", "fizChannel").build()); + this.testSource.receive(); this.server.rebuild(); graph = this.server.getGraph(); @@ -144,24 +155,44 @@ public void test() throws Exception { assertThat(map.size()).isEqualTo(3); nodes = (List>) map.get("nodes"); assertThat(nodes).isNotNull(); - assertThat(nodes.size()).isEqualTo(32); + assertThat(nodes.size()).isEqualTo(33); links = (List>) map.get("links"); assertThat(links).isNotNull(); - assertThat(links.size()).isEqualTo(35); + assertThat(links.size()).isEqualTo(37); + + jsonArray = JsonPathUtils.evaluate(baos.toByteArray(), "$..nodes[?(@.name == 'router')]"); + String routerJson = jsonArray.toJSONString(); + assertThat(routerJson).contains("\"stats\":{\"deprecated"); + assertThat(routerJson).contains("\"sendTimers\":{\"successes\":{\"count\":4"); + jsonArray = JsonPathUtils.evaluate(baos.toByteArray(), "$..nodes[?(@.name == 'toRouter')]"); + String toRouterJson = jsonArray.toJSONString(); + assertThat(toRouterJson).contains("\"sendTimers\":{\"successes\":{\"count\":4"); + jsonArray = JsonPathUtils.evaluate(baos.toByteArray(), "$..nodes[?(@.name == 'testSource')]"); + String sourceJson = jsonArray.toJSONString(); + assertThat(sourceJson).contains("\"receiveCounters\":{\"successes\":1"); + + // stats refresh without rebuild() + this.testSource.receive(); + baos = new ByteArrayOutputStream(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + objectMapper.writeValue(baos, graph); + jsonArray = JsonPathUtils.evaluate(baos.toByteArray(), "$..nodes[?(@.name == 'testSource')]"); + sourceJson = jsonArray.toJSONString(); + assertThat(sourceJson).contains("\"receiveCounters\":{\"successes\":2"); } @Test public void testIncludesDynamic() { Graph graph = this.server.getGraph(); - assertThat(graph.getNodes().size()).isEqualTo(32); + assertThat(graph.getNodes().size()).isEqualTo(33); IntegrationFlow flow = f -> f.handle(m -> { }); IntegrationFlowRegistration reg = this.flowContext.registration(flow).register(); graph = this.server.rebuild(); - assertThat(graph.getNodes().size()).isEqualTo(34); + assertThat(graph.getNodes().size()).isEqualTo(35); this.flowContext.remove(reg.getId()); graph = this.server.rebuild(); - assertThat(graph.getNodes().size()).isEqualTo(32); + assertThat(graph.getNodes().size()).isEqualTo(33); } @Configuration @@ -188,6 +219,11 @@ public IntegrationGraphServer server() { return server; } + @Bean + public MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } + @Bean public MessageProducer producer() { MessageProducerSupport producer = new MessageProducerSupport() { @@ -308,6 +344,24 @@ public MessageChannel fizChannel() { return new QueueChannel(); } + @Bean + @InboundChannelAdapter(channel = "fizChannel", autoStartup = "false") + public MessageSource testSource() { + return new AbstractMessageSource() { + + @Override + public String getComponentType() { + return "source"; + } + + @Override + protected Object doReceive() { + return new GenericMessage<>("foo"); + } + + }; + } + } public static class Services { diff --git a/src/reference/asciidoc/graph.adoc b/src/reference/asciidoc/graph.adoc index c89aa49f2e1..15baf67e436 100644 --- a/src/reference/asciidoc/graph.adoc +++ b/src/reference/asciidoc/graph.adoc @@ -9,67 +9,86 @@ The resulting `Graph` object can be serialized to any format, although JSON is f A Spring Integration application with only the default components would expose a graph as follows: ==== -[source,json] +[source,json,subs="normal"] ---- { - "contentDescriptor": { - "providerVersion": "4.3.0.RELEASE", - "providerFormatVersion": 1.0, - "provider": "spring-integration", - "name": "myApplication" + "contentDescriptor" : { + "providerVersion" : "{project-version}", + "providerFormatVersion" : 1.1, + "provider" : "spring-integration", + "name" : "myAppName:1.0" }, - "nodes": [ - { - "nodeId": 1, - "name": "nullChannel", - "stats": null, - "componentType": "channel" + "nodes" : [ { + "nodeId" : 1, + "componentType" : "null-channel", + "properties" : { }, + "sendTimers" : { + "successes" : { + "count" : 1, + "mean" : 0.0, + "max" : 0.0 + }, + "failures" : { + "count" : 0, + "mean" : 0.0, + "max" : 0.0 + } }, - { - "nodeId": 2, - "name": "errorChannel", - "stats": null, - "componentType": "publish-subscribe-channel" + "receiveCounters" : { + "successes" : 0, + "failures" : 0 }, - { - "nodeId": 3, - "name": "_org.springframework.integration.errorLogger", - "stats": { - "duration": { - "count": 0, - "min": 0.0, - "max": 0.0, - "mean": 0.0, - "standardDeviation": 0.0, - "countLong": 0 - }, - "errorCount": 0, - "standardDeviationDuration": 0.0, - "countsEnabled": true, - "statsEnabled": true, - "loggingEnabled": false, - "handleCount": 0, - "meanDuration": 0.0, - "maxDuration": 0.0, - "minDuration": 0.0, - "activeCount": 0 + "name" : "nullChannel" + }, { + "nodeId" : 2, + "componentType" : "publish-subscribe-channel", + "properties" : { }, + "sendTimers" : { + "successes" : { + "count" : 1, + "mean" : 7.807002, + "max" : 7.807002 }, - "componentType": "logging-channel-adapter", - "output": null, - "input": "errorChannel" - } - ], - "links": [ - { - "from": 2, - "to": 3, - "type": "input" - } - ] + "failures" : { + "count" : 0, + "mean" : 0.0, + "max" : 0.0 + } + }, + "name" : "errorChannel" + }, { + "nodeId" : 3, + "componentType" : "logging-channel-adapter", + "properties" : { }, + "output" : null, + "input" : "errorChannel", + "sendTimers" : { + "successes" : { + "count" : 1, + "mean" : 6.742722, + "max" : 6.742722 + }, + "failures" : { + "count" : 0, + "mean" : 0.0, + "max" : 0.0 + } + }, + "name" : "_org.springframework.integration.errorLogger" + } ], + "links" : [ { + "from" : 2, + "to" : 3, + "type" : "input" + } ] } ---- ==== +NOTE: Version 5.2 has deprecated the legacy metrics in favor of Micrometer meters as discussed <<./metrics.adoc#metrics-management,Metrics Management>>. +While not shown above, the legacy metrics (under the `stats` child node) will continue to appear in the graph, but with an extra child node `"deprecated" : "stats are deprecated in favor of sendTimers and receiveCounters"`. +The `providerFormatVersion` has been changed to 1.1 to reflect this change. + In the preceding example, the graph consists of three top-level elements. The `contentDescriptor` graph element contains general information about the application providing the data. diff --git a/src/reference/asciidoc/metrics.adoc b/src/reference/asciidoc/metrics.adoc index bf52d4aefe7..4a4381701aa 100644 --- a/src/reference/asciidoc/metrics.adoc +++ b/src/reference/asciidoc/metrics.adoc @@ -94,6 +94,9 @@ IMPORTANT: Starting with version 5.0.2, the framework automatically detects whether the application context has a single `MetricsFactory` bean and, if so, uses it instead of the default metrics factory. +IMPORTANT: These legacy metrics have been deprecated in favor of Micrometer metrics discussed below. +Legacy metrics support will be removed in a future release. + [[micrometer-integration]] ==== Micrometer Integration