Skip to content

GH-3056: Add Micrometer Metrics to Runtime Graph #3057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.IntegrationConsumer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.endpoint.PollingConsumer;
Expand All @@ -50,6 +51,8 @@
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.util.ClassUtils;

/**
* Builds the runtime object model graph.
Expand All @@ -62,7 +65,9 @@
*/
public class IntegrationGraphServer implements ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {

private static final float GRAPH_VERSION = 1.0f;
private static final float GRAPH_VERSION = 1.1f;

private static MicrometerNodeEnhancer micrometerEnhancer;

private final NodeFactory nodeFactory = new NodeFactory();

Expand Down Expand Up @@ -152,6 +157,10 @@ protected <T> Map<String, T> getBeansOfType(Class<T> type) {
}

private synchronized Graph buildGraph() {
if (micrometerEnhancer == null && ClassUtils.isPresent("io.micrometer.core.instrument.MeterRegistry",
this.applicationContext.getClassLoader())) {
micrometerEnhancer = new MicrometerNodeEnhancer(this.applicationContext);
}
String implementationVersion = IntegrationGraphServer.class.getPackage().getImplementationVersion();
if (implementationVersion == null) {
implementationVersion = "unknown - is Spring Integration running from the distribution jar?";
Expand Down Expand Up @@ -347,91 +356,128 @@ private static final class NodeFactory {
super();
}

private MessageChannelNode channelNode(String name, MessageChannel channel) {
return new MessageChannelNode(this.nodeId.incrementAndGet(), name, channel);
MessageChannelNode channelNode(String name, MessageChannel channel) {
MessageChannelNode node;
if (channel instanceof PollableChannel) {
node = new PollableChannelNode(this.nodeId.incrementAndGet(), name, channel);
}
else {
node = new MessageChannelNode(this.nodeId.incrementAndGet(), name, channel);
}
if (IntegrationGraphServer.micrometerEnhancer != null) {
node = IntegrationGraphServer.micrometerEnhancer.enhance(node);
}
return node;
}

private MessageGatewayNode gatewayNode(String name, MessagingGatewaySupport gateway) {
MessageGatewayNode gatewayNode(String name, MessagingGatewaySupport gateway) {
String errorChannel = channelToBeanName(gateway.getErrorChannel());
String requestChannel = channelToBeanName(gateway.getRequestChannel());
return new MessageGatewayNode(this.nodeId.incrementAndGet(), name, gateway, requestChannel, errorChannel);
}

@Nullable
private String channelToBeanName(MessageChannel messageChannel) {
String channelToBeanName(MessageChannel messageChannel) {
return messageChannel instanceof NamedComponent
? ((NamedComponent) messageChannel).getBeanName()
: Objects.toString(messageChannel, null);
}

private MessageProducerNode producerNode(String name, MessageProducerSupport producer) {
MessageProducerNode producerNode(String name, MessageProducerSupport producer) {
String errorChannel = channelToBeanName(producer.getErrorChannel());
String outputChannel = channelToBeanName(producer.getOutputChannel());
return new MessageProducerNode(this.nodeId.incrementAndGet(), name, producer,
outputChannel, errorChannel);
}

private MessageSourceNode sourceNode(String name, SourcePollingChannelAdapter adapter) {
MessageSourceNode sourceNode(String name, SourcePollingChannelAdapter adapter) {
String errorChannel = channelToBeanName(adapter.getDefaultErrorChannel());
String outputChannel = channelToBeanName(adapter.getOutputChannel());
return new MessageSourceNode(this.nodeId.incrementAndGet(), name, adapter.getMessageSource(),
String nameToUse = name;
MessageSource<?> source = adapter.getMessageSource();
if (source instanceof NamedComponent) {
nameToUse = ((NamedComponent) source).getComponentName();
}
MessageSourceNode node = new MessageSourceNode(this.nodeId.incrementAndGet(), nameToUse, source,
outputChannel, errorChannel);
if (IntegrationGraphServer.micrometerEnhancer != null) {
node = IntegrationGraphServer.micrometerEnhancer.enhance(node);
}
return node;
}

private MessageHandlerNode handlerNode(String name, IntegrationConsumer consumer) {
MessageHandlerNode handlerNode(String nameArg, IntegrationConsumer consumer) {
String outputChannelName = channelToBeanName(consumer.getOutputChannel());
MessageHandler handler = consumer.getHandler();
MessageHandlerNode node;
String name = nameArg;
if (handler instanceof NamedComponent) {
name = ((NamedComponent) handler).getComponentName();
}
if (handler instanceof CompositeMessageHandler) {
return compositeHandler(name, consumer, (CompositeMessageHandler) handler, outputChannelName, null,
node = compositeHandler(name, consumer, (CompositeMessageHandler) handler, outputChannelName, null,
false);
}
else if (handler instanceof DiscardingMessageHandler) {
return discardingHandler(name, consumer, (DiscardingMessageHandler) handler, outputChannelName, null,
node = discardingHandler(name, consumer, (DiscardingMessageHandler) handler, outputChannelName, null,
false);
}
else if (handler instanceof MappingMessageRouterManagement) {
return routingHandler(name, consumer, handler, (MappingMessageRouterManagement) handler,
node = routingHandler(name, consumer, handler, (MappingMessageRouterManagement) handler,
outputChannelName, null, false);
}
else if (handler instanceof RecipientListRouterManagement) {
return recipientListRoutingHandler(name, consumer, handler, (RecipientListRouterManagement) handler,
node = recipientListRoutingHandler(name, consumer, handler, (RecipientListRouterManagement) handler,
outputChannelName, null, false);
}
else {
String inputChannel = channelToBeanName(consumer.getInputChannel());
return new MessageHandlerNode(this.nodeId.incrementAndGet(), name, handler,
node = new MessageHandlerNode(this.nodeId.incrementAndGet(), name, handler,
inputChannel, outputChannelName);
}
if (IntegrationGraphServer.micrometerEnhancer != null) {
node = IntegrationGraphServer.micrometerEnhancer.enhance(node);
}
return node;
}

private MessageHandlerNode polledHandlerNode(String name, PollingConsumer consumer) {
MessageHandlerNode polledHandlerNode(String nameArg, PollingConsumer consumer) {
String outputChannelName = channelToBeanName(consumer.getOutputChannel());
String errorChannel = channelToBeanName(consumer.getDefaultErrorChannel());
MessageHandler handler = consumer.getHandler();
MessageHandlerNode node;
String name = nameArg;
if (handler instanceof NamedComponent) {
name = ((NamedComponent) handler).getComponentName();
}
if (handler instanceof CompositeMessageHandler) {
return compositeHandler(name, consumer, (CompositeMessageHandler) handler, outputChannelName,
node = compositeHandler(name, consumer, (CompositeMessageHandler) handler, outputChannelName,
errorChannel, true);
}
else if (handler instanceof DiscardingMessageHandler) {
return discardingHandler(name, consumer, (DiscardingMessageHandler) handler, outputChannelName,
node = discardingHandler(name, consumer, (DiscardingMessageHandler) handler, outputChannelName,
errorChannel, true);
}
else if (handler instanceof MappingMessageRouterManagement) {
return routingHandler(name, consumer, handler, (MappingMessageRouterManagement) handler,
node = routingHandler(name, consumer, handler, (MappingMessageRouterManagement) handler,
outputChannelName, errorChannel, true);
}
else if (handler instanceof RecipientListRouterManagement) {
return recipientListRoutingHandler(name, consumer, handler, (RecipientListRouterManagement) handler,
node = recipientListRoutingHandler(name, consumer, handler, (RecipientListRouterManagement) handler,
outputChannelName, errorChannel, true);
}
else {
String inputChannel = channelToBeanName(consumer.getInputChannel());
return new ErrorCapableMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler,
node = new ErrorCapableMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler,
inputChannel, outputChannelName, errorChannel);
}
if (IntegrationGraphServer.micrometerEnhancer != null) {
node = IntegrationGraphServer.micrometerEnhancer.enhance(node);
}
return node;
}

private MessageHandlerNode compositeHandler(String name, IntegrationConsumer consumer,
MessageHandlerNode compositeHandler(String name, IntegrationConsumer consumer,
CompositeMessageHandler handler, String output, String errors, boolean polled) {

List<CompositeMessageHandlerNode.InnerHandler> innerHandlers =
Expand All @@ -453,7 +499,7 @@ private MessageHandlerNode compositeHandler(String name, IntegrationConsumer con
inputChannel, output, innerHandlers);
}

private MessageHandlerNode discardingHandler(String name, IntegrationConsumer consumer,
MessageHandlerNode discardingHandler(String name, IntegrationConsumer consumer,
DiscardingMessageHandler handler, String output, String errors, boolean polled) {

String discards = channelToBeanName(handler.getDiscardChannel());
Expand All @@ -465,7 +511,7 @@ private MessageHandlerNode discardingHandler(String name, IntegrationConsumer co
inputChannel, output, discards);
}

private MessageHandlerNode routingHandler(String name, IntegrationConsumer consumer, MessageHandler handler,
MessageHandlerNode routingHandler(String name, IntegrationConsumer consumer, MessageHandler handler,
MappingMessageRouterManagement router, String output, String errors, boolean polled) {

Collection<String> routes =
Expand All @@ -481,7 +527,7 @@ private MessageHandlerNode routingHandler(String name, IntegrationConsumer consu
inputChannel, output, routes);
}

private MessageHandlerNode recipientListRoutingHandler(String name, IntegrationConsumer consumer,
MessageHandlerNode recipientListRoutingHandler(String name, IntegrationConsumer consumer,
MessageHandler handler, RecipientListRouterManagement router, String output, String errors,
boolean polled) {

Expand All @@ -499,7 +545,7 @@ private MessageHandlerNode recipientListRoutingHandler(String name, IntegrationC
inputChannel, output, routes);
}

private void reset() {
void reset() {
this.nodeId.set(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,16 @@ public void addProperties(@Nullable Map<String, Object> props) {

public static class Stats {

private final String deprecated = "stats are deprecated in favor of sendTimers and receiveCounters";

protected boolean isAvailable() {
return false;
}

public String getDeprecated() {
return this.deprecated;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.integration.graph;

import java.util.function.Supplier;

import org.springframework.messaging.MessageChannel;

/**
Expand All @@ -27,7 +29,9 @@
*
*/
@SuppressWarnings("deprecation")
public class MessageChannelNode extends IntegrationNode {
public class MessageChannelNode extends IntegrationNode implements SendTimersAware {

private Supplier<SendTimers> sendTimers;

public MessageChannelNode(int nodeId, String name, MessageChannel channel) {
super(nodeId, name, channel,
Expand All @@ -36,6 +40,14 @@ public MessageChannelNode(int nodeId, String name, MessageChannel channel) {
: new IntegrationNode.Stats());
}

public SendTimers getSendTimers() {
return this.sendTimers.get();
}

@Override
public void sendTimers(Supplier<SendTimers> timers) {
this.sendTimers = timers;
}

public static final class Stats extends IntegrationNode.Stats {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.integration.graph;

import java.util.function.Supplier;

import org.springframework.messaging.MessageHandler;

/**
Expand All @@ -27,10 +29,12 @@
*
*/
@SuppressWarnings("deprecation")
public class MessageHandlerNode extends EndpointNode {
public class MessageHandlerNode extends EndpointNode implements SendTimersAware {

private final String input;

private Supplier<SendTimers> sendTimers;

public MessageHandlerNode(int nodeId, String name, MessageHandler handler, String input, String output) {
super(nodeId, name, handler, output,
handler instanceof org.springframework.integration.support.management.MessageHandlerMetrics
Expand All @@ -43,6 +47,15 @@ public String getInput() {
return this.input;
}

public SendTimers getSendTimers() {
return this.sendTimers.get();
}

@Override
public void sendTimers(Supplier<SendTimers> timers) {
this.sendTimers = timers;
}

public static final class Stats extends IntegrationNode.Stats {

private final org.springframework.integration.support.management.MessageHandlerMetrics handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.integration.graph;

import java.util.function.Supplier;

import org.springframework.integration.core.MessageSource;

/**
Expand All @@ -27,7 +29,9 @@
*
*/
@SuppressWarnings("deprecation")
public class MessageSourceNode extends ErrorCapableEndpointNode {
public class MessageSourceNode extends ErrorCapableEndpointNode implements ReceiveCountersAware {

private Supplier<ReceiveCounters> receiveCounters;

public MessageSourceNode(int nodeId, String name, MessageSource<?> messageSource, String output, String errors) {
super(nodeId, name, messageSource, output, errors,
Expand All @@ -37,6 +41,14 @@ public MessageSourceNode(int nodeId, String name, MessageSource<?> messageSource
: new IntegrationNode.Stats());
}

public ReceiveCounters getReceiveCounters() {
return this.receiveCounters.get();
}

@Override
public void receiveCounters(Supplier<ReceiveCounters> counters) {
this.receiveCounters = counters;
}

public static final class Stats extends IntegrationNode.Stats {

Expand Down
Loading