Skip to content

Commit e2bc52e

Browse files
authored
Improved usability of the mock APM server for local testing. (#130609)
To verify specific metrics or transactions, you can now filter the output using one or several of the options below. Each of these takes a comma-separated list of wildcard expressions. - `-apm-metrics` - `-apm-transactions` - `-apm-transactions-excludes` Relates to ES-10969
1 parent f07770f commit e2bc52e

File tree

3 files changed

+156
-69
lines changed

3 files changed

+156
-69
lines changed

build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java

Lines changed: 90 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,30 @@
99

1010
package org.elasticsearch.gradle.testclusters;
1111

12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import com.fasterxml.jackson.databind.node.ObjectNode;
1214
import com.sun.net.httpserver.HttpExchange;
1315
import com.sun.net.httpserver.HttpHandler;
1416
import com.sun.net.httpserver.HttpServer;
1517

18+
import org.apache.commons.io.IOUtils;
19+
import org.apache.commons.lang3.stream.Streams;
1620
import org.gradle.api.logging.Logger;
1721
import org.gradle.api.logging.Logging;
22+
import org.slf4j.LoggerFactory;
1823

24+
import java.io.BufferedReader;
1925
import java.io.ByteArrayOutputStream;
2026
import java.io.IOException;
2127
import java.io.InputStream;
28+
import java.io.InputStreamReader;
2229
import java.io.OutputStream;
2330
import java.net.InetSocketAddress;
31+
import java.util.Arrays;
32+
import java.util.regex.Pattern;
33+
import java.util.stream.Collectors;
34+
35+
import javax.annotation.concurrent.NotThreadSafe;
2436

2537
/**
2638
* This is a server which just accepts lines of JSON code and if the JSON
@@ -32,102 +44,127 @@
3244
* <p>
3345
* The HTTP server used is the JDK embedded com.sun.net.httpserver
3446
*/
47+
@NotThreadSafe
3548
public class MockApmServer {
3649
private static final Logger logger = Logging.getLogger(MockApmServer.class);
37-
private int port;
50+
private static final org.slf4j.Logger log = LoggerFactory.getLogger(MockApmServer.class);
51+
private final Pattern metricFilter;
52+
private final Pattern transactionFilter;
53+
private final Pattern transactionExcludesFilter;
3854

39-
public MockApmServer(int port) {
40-
this.port = port;
41-
}
55+
private HttpServer instance;
4256

43-
/**
44-
* Simple main that starts a mock APM server and prints the port it is
45-
* running on. This is not needed
46-
* for testing, it is just a convenient template for trying things out
47-
* if you want play around.
48-
*/
49-
public static void main(String[] args) throws IOException, InterruptedException {
50-
MockApmServer server = new MockApmServer(9999);
51-
server.start();
57+
public MockApmServer(String metricFilter, String transactionFilter, String transactionExcludesFilter) {
58+
this.metricFilter = createWildcardPattern(metricFilter);
59+
this.transactionFilter = createWildcardPattern(transactionFilter);
60+
this.transactionExcludesFilter = createWildcardPattern(transactionExcludesFilter);
5261
}
5362

54-
private static volatile HttpServer instance;
63+
private Pattern createWildcardPattern(String filter) {
64+
if (filter == null || filter.isEmpty()) {
65+
return null;
66+
}
67+
var pattern = Arrays.stream(filter.split(",\\s*"))
68+
.map(Pattern::quote)
69+
.map(s -> s.replace("*", "\\E.*\\Q"))
70+
.collect(Collectors.joining(")|(", "(", ")"));
71+
return Pattern.compile(pattern);
72+
}
5573

5674
/**
5775
* Start the Mock APM server. Just returns empty JSON structures for every incoming message
5876
*
59-
* @return - the port the Mock APM server started on
6077
* @throws IOException
6178
*/
62-
public synchronized int start() throws IOException {
79+
public void start() throws IOException {
6380
if (instance != null) {
64-
String hostname = instance.getAddress().getHostName();
65-
int port = instance.getAddress().getPort();
66-
logger.lifecycle("MockApmServer is already running. Reusing on address:port " + hostname + ":" + port);
67-
return port;
81+
throw new IllegalStateException("MockApmServer already started");
6882
}
69-
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", port);
83+
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 0);
7084
HttpServer server = HttpServer.create(addr, 10);
71-
server.createContext("/exit", new ExitHandler());
7285
server.createContext("/", new RootHandler());
73-
7486
server.start();
7587
instance = server;
7688
logger.lifecycle("MockApmServer started on port " + server.getAddress().getPort());
77-
return server.getAddress().getPort();
7889
}
7990

8091
public int getPort() {
81-
return port;
92+
if (instance == null) {
93+
throw new IllegalStateException("MockApmServer not started");
94+
}
95+
return instance.getAddress().getPort();
8296
}
8397

8498
/**
8599
* Stop the server gracefully if possible
86100
*/
87-
public synchronized void stop() {
88-
logger.lifecycle("stopping apm server");
89-
instance.stop(1);
90-
instance = null;
101+
public void stop() {
102+
if (instance != null) {
103+
logger.lifecycle("stopping apm server");
104+
instance.stop(1);
105+
instance = null;
106+
}
91107
}
92108

93109
class RootHandler implements HttpHandler {
94110
public void handle(HttpExchange t) {
95111
try {
96112
InputStream body = t.getRequestBody();
97-
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
98-
byte[] buffer = new byte[8 * 1024];
99-
int lengthRead;
100-
while ((lengthRead = body.read(buffer)) > 0) {
101-
bytes.write(buffer, 0, lengthRead);
113+
if (metricFilter == null && transactionFilter == null) {
114+
logRequestBody(body);
115+
} else {
116+
logFiltered(body);
102117
}
103-
logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));
104118

105119
String response = "{}";
106120
t.sendResponseHeaders(200, response.length());
107-
OutputStream os = t.getResponseBody();
108-
os.write(response.getBytes());
109-
os.close();
121+
try (OutputStream os = t.getResponseBody()) {
122+
os.write(response.getBytes());
123+
}
110124
} catch (Exception e) {
111125
e.printStackTrace();
112126
}
113127
}
114-
}
115128

116-
static class ExitHandler implements HttpHandler {
117-
private static final int STOP_TIME = 3;
129+
private void logRequestBody(InputStream body) throws IOException {
130+
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
131+
IOUtils.copy(body, bytes);
132+
logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));
133+
}
118134

119-
public void handle(HttpExchange t) {
120-
try {
121-
InputStream body = t.getRequestBody();
122-
String response = "{}";
123-
t.sendResponseHeaders(200, response.length());
124-
OutputStream os = t.getResponseBody();
125-
os.write(response.getBytes());
126-
os.close();
127-
instance.stop(STOP_TIME);
128-
instance = null;
129-
} catch (Exception e) {
130-
e.printStackTrace();
135+
private void logFiltered(InputStream body) throws IOException {
136+
ObjectMapper mapper = new ObjectMapper();
137+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(body))) {
138+
String line;
139+
String tier = null;
140+
String node = null;
141+
142+
while ((line = reader.readLine()) != null) {
143+
var jsonNode = mapper.readTree(line);
144+
145+
if (jsonNode.has("metadata")) {
146+
node = jsonNode.path("metadata").path("service").path("node").path("configured_name").asText(null);
147+
tier = jsonNode.path("metadata").path("labels").path("node_tier").asText(null);
148+
} else if (transactionFilter != null && jsonNode.has("transaction")) {
149+
var transaction = jsonNode.get("transaction");
150+
var name = transaction.get("name").asText();
151+
if (transactionFilter.matcher(name).matches()
152+
&& (transactionExcludesFilter == null || transactionExcludesFilter.matcher(name).matches() == false)) {
153+
logger.lifecycle("Transaction [{}/{}]: {}", node, tier, transaction);
154+
}
155+
} else if (metricFilter != null && jsonNode.has("metricset")) {
156+
var metricset = jsonNode.get("metricset");
157+
var samples = (ObjectNode) metricset.get("samples");
158+
for (var name : Streams.of(samples.fieldNames()).toList()) {
159+
if (metricFilter.matcher(name).matches() == false) {
160+
samples.remove(name);
161+
}
162+
}
163+
if (samples.isEmpty() == false) {
164+
logger.lifecycle("Metricset [{}/{}]", node, tier, metricset);
165+
}
166+
}
167+
}
131168
}
132169
}
133170
}

build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ public abstract class RunTask extends DefaultTestClustersTask {
4646

4747
private Boolean apmServerEnabled = false;
4848

49+
private String apmServerMetrics = null;
50+
51+
private String apmServerTransactions = null;
52+
53+
private String apmServerTransactionsExcludes = null;
54+
4955
private List<String> plugins;
5056

5157
private Boolean preserveData = false;
@@ -99,11 +105,44 @@ public Boolean getApmServerEnabled() {
99105
return apmServerEnabled;
100106
}
101107

108+
@Input
109+
@Optional
110+
public String getApmServerMetrics() {
111+
return apmServerMetrics;
112+
}
113+
114+
@Input
115+
@Optional
116+
public String getApmServerTransactions() {
117+
return apmServerTransactions;
118+
}
119+
120+
@Input
121+
@Optional
122+
public String getApmServerTransactionsExcludes() {
123+
return apmServerTransactionsExcludes;
124+
}
125+
102126
@Option(option = "with-apm-server", description = "Run simple logging http server to accept apm requests")
103127
public void setApmServerEnabled(Boolean apmServerEnabled) {
104128
this.apmServerEnabled = apmServerEnabled;
105129
}
106130

131+
@Option(option = "apm-metrics", description = "Metric wildcard filter for APM server")
132+
public void setApmServerMetrics(String apmServerMetrics) {
133+
this.apmServerMetrics = apmServerMetrics;
134+
}
135+
136+
@Option(option = "apm-transactions", description = "Transaction wildcard filter for APM server")
137+
public void setApmServerTransactions(String apmServerTransactions) {
138+
this.apmServerTransactions = apmServerTransactions;
139+
}
140+
141+
@Option(option = "apm-transactions-excludes", description = "Transaction wildcard filter for APM server")
142+
public void setApmServerTransactionsExcludes(String apmServerTransactionsExcludes) {
143+
this.apmServerTransactionsExcludes = apmServerTransactionsExcludes;
144+
}
145+
107146
@Option(option = "with-plugins", description = "Run distribution with plugins installed")
108147
public void setPlugins(String plugins) {
109148
this.plugins = Arrays.asList(plugins.split(","));
@@ -204,6 +243,15 @@ public void beforeStart() {
204243
getDataPath = n -> dataDir.resolve(n.getName());
205244
}
206245

246+
if (apmServerEnabled) {
247+
try {
248+
mockServer = new MockApmServer(apmServerMetrics, apmServerTransactions, apmServerTransactionsExcludes);
249+
mockServer.start();
250+
} catch (IOException e) {
251+
throw new GradleException("Unable to start APM server: " + e.getMessage(), e);
252+
}
253+
}
254+
207255
for (ElasticsearchCluster cluster : getClusters()) {
208256
cluster.setPreserveDataDir(preserveData);
209257
for (ElasticsearchNode node : cluster.getNodes()) {
@@ -232,19 +280,12 @@ public void beforeStart() {
232280
node.setting("xpack.security.transport.ssl.keystore.path", "transport.keystore");
233281
node.setting("xpack.security.transport.ssl.certificate_authorities", "transport.ca");
234282
}
235-
236-
if (apmServerEnabled) {
237-
mockServer = new MockApmServer(9999);
238-
try {
239-
mockServer.start();
240-
node.setting("telemetry.metrics.enabled", "true");
241-
node.setting("telemetry.tracing.enabled", "true");
242-
node.setting("telemetry.agent.transaction_sample_rate", "0.10");
243-
node.setting("telemetry.agent.metrics_interval", "10s");
244-
node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort());
245-
} catch (IOException e) {
246-
logger.warn("Unable to start APM server", e);
247-
}
283+
if (mockServer != null) {
284+
node.setting("telemetry.metrics.enabled", "true");
285+
node.setting("telemetry.tracing.enabled", "true");
286+
node.setting("telemetry.agent.transaction_sample_rate", "1.0");
287+
node.setting("telemetry.agent.metrics_interval", "10s");
288+
node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort());
248289
}
249290
// in serverless metrics are enabled by default
250291
// if metrics were not enabled explicitly for gradlew run we should disable them

modules/apm/METERING.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,20 @@ of value that was reported during the metric event
8787

8888
## Development
8989

90-
### Mock http server
90+
### Local mock APM server
9191

9292
The quickest way to verify that your metrics are working is to run `./gradlew run --with-apm-server`.
93-
This will run ES node (or nodes in serverless) and also start a mock http server that will act
94-
as an apm server. This fake http server will log all the http messages it receives from apm-agent
93+
This will run an ES node (or nodes in serverless) and also start a mock APM server that logs all messages it receives from apm-agent.
94+
95+
To verify specific metrics or transactions, you can filter the output using one or several of the options below. Each of these takes a comma-separated list of wildcard expressions.
96+
- `-apm-metrics`
97+
- `-apm-transactions`
98+
- `-apm-transactions-excludes`
99+
100+
For example:
101+
```
102+
./gradlew run -Dtests.es.logger.level=WARN --with-apm-server --apm-metric="es.rest.requests.total", --apm-transactions="cluster:monitor/*" --apm-transactions-excludes="cluster:monitor/xpack/*"```
103+
```
95104

96105
### With APM server in cloud
97106
You can also run local ES node with an apm server in cloud.

0 commit comments

Comments
 (0)