Skip to content

Commit 48516d9

Browse files
committed
Vectorize SpectatorHistogram
1 parent d342a10 commit 48516d9

File tree

12 files changed

+1749
-48
lines changed

12 files changed

+1749
-48
lines changed

benchmarks/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@
9696
<artifactId>druid-histogram</artifactId>
9797
<version>${project.parent.version}</version>
9898
</dependency>
99+
<dependency>
100+
<groupId>org.apache.druid.extensions.contrib</groupId>
101+
<artifactId>druid-spectator-histogram</artifactId>
102+
<version>${project.parent.version}</version>
103+
</dependency>
99104
<dependency>
100105
<groupId>org.apache.druid.extensions</groupId>
101106
<artifactId>druid-stats</artifactId>
Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.benchmark;
21+
22+
import com.fasterxml.jackson.databind.Module;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import com.google.common.collect.ImmutableMap;
25+
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
26+
import org.apache.druid.jackson.DefaultObjectMapper;
27+
import org.apache.druid.java.util.common.FileUtils;
28+
import org.apache.druid.java.util.common.granularity.Granularities;
29+
import org.apache.druid.java.util.common.guava.Sequence;
30+
import org.apache.druid.java.util.common.logger.Logger;
31+
import org.apache.druid.query.Druids;
32+
import org.apache.druid.query.FinalizeResultsQueryRunner;
33+
import org.apache.druid.query.Query;
34+
import org.apache.druid.query.QueryContexts;
35+
import org.apache.druid.query.QueryPlus;
36+
import org.apache.druid.query.QueryRunner;
37+
import org.apache.druid.query.QueryRunnerFactory;
38+
import org.apache.druid.query.QueryToolChest;
39+
import org.apache.druid.query.Result;
40+
import org.apache.druid.query.aggregation.AggregatorFactory;
41+
import org.apache.druid.query.context.ResponseContext;
42+
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
43+
import org.apache.druid.query.spec.QuerySegmentSpec;
44+
import org.apache.druid.query.timeseries.TimeseriesQuery;
45+
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
46+
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
47+
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
48+
import org.apache.druid.query.timeseries.TimeseriesResultValue;
49+
import org.apache.druid.segment.IncrementalIndexSegment;
50+
import org.apache.druid.segment.IndexIO;
51+
import org.apache.druid.segment.IndexMergerV9;
52+
import org.apache.druid.segment.IndexSpec;
53+
import org.apache.druid.segment.QueryableIndex;
54+
import org.apache.druid.segment.QueryableIndexSegment;
55+
import org.apache.druid.segment.column.ColumnConfig;
56+
import org.apache.druid.segment.generator.DataGenerator;
57+
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
58+
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
59+
import org.apache.druid.segment.incremental.AppendableIndexSpec;
60+
import org.apache.druid.segment.incremental.IncrementalIndex;
61+
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
62+
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
63+
import org.apache.druid.spectator.histogram.SpectatorHistogramAggregatorFactory;
64+
import org.apache.druid.spectator.histogram.SpectatorHistogramModule;
65+
import org.apache.druid.timeline.SegmentId;
66+
import org.openjdk.jmh.annotations.Benchmark;
67+
import org.openjdk.jmh.annotations.BenchmarkMode;
68+
import org.openjdk.jmh.annotations.Fork;
69+
import org.openjdk.jmh.annotations.Level;
70+
import org.openjdk.jmh.annotations.Measurement;
71+
import org.openjdk.jmh.annotations.Mode;
72+
import org.openjdk.jmh.annotations.OutputTimeUnit;
73+
import org.openjdk.jmh.annotations.Param;
74+
import org.openjdk.jmh.annotations.Scope;
75+
import org.openjdk.jmh.annotations.Setup;
76+
import org.openjdk.jmh.annotations.State;
77+
import org.openjdk.jmh.annotations.TearDown;
78+
import org.openjdk.jmh.annotations.Warmup;
79+
import org.openjdk.jmh.infra.Blackhole;
80+
import org.openjdk.jmh.runner.Runner;
81+
import org.openjdk.jmh.runner.RunnerException;
82+
import org.openjdk.jmh.runner.options.Options;
83+
import org.openjdk.jmh.runner.options.OptionsBuilder;
84+
85+
import java.io.File;
86+
import java.io.IOException;
87+
import java.util.Collections;
88+
import java.util.List;
89+
import java.util.concurrent.TimeUnit;
90+
91+
/**
92+
* Benchmark to compare performance of SpectatorHistogram aggregator with and without vectorization.
93+
*/
94+
@State(Scope.Benchmark)
95+
@Fork(value = 1, jvmArgs = {
96+
"--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED",
97+
"--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED",
98+
"--add-opens=java.base/java.nio=ALL-UNNAMED",
99+
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
100+
"--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED",
101+
"--add-opens=java.base/java.io=ALL-UNNAMED",
102+
"--add-opens=java.base/java.lang=ALL-UNNAMED",
103+
"--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED"
104+
})
105+
@Warmup(iterations = 10)
106+
@Measurement(iterations = 25)
107+
public class SpectatorHistogramAggregatorBenchmark
108+
{
109+
private static final Logger log = new Logger(SpectatorHistogramAggregatorBenchmark.class);
110+
private static final int RNG_SEED = 9999;
111+
private static final IndexMergerV9 INDEX_MERGER_V9;
112+
private static final IndexIO INDEX_IO;
113+
public static final ObjectMapper JSON_MAPPER;
114+
115+
static {
116+
JSON_MAPPER = new DefaultObjectMapper();
117+
// Register the SpectatorHistogram Jackson modules and serde
118+
SpectatorHistogramModule module = new SpectatorHistogramModule();
119+
for (Module jacksonModule : module.getJacksonModules()) {
120+
JSON_MAPPER.registerModule(jacksonModule);
121+
}
122+
SpectatorHistogramModule.registerSerde();
123+
124+
INDEX_IO = new IndexIO(
125+
JSON_MAPPER,
126+
new ColumnConfig()
127+
{
128+
}
129+
);
130+
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
131+
}
132+
133+
@Param({"1000000"})
134+
private int rowsPerSegment;
135+
136+
@Param({"false", "true"})
137+
private String vectorize;
138+
139+
@Param({"long1"})
140+
private String metricName;
141+
142+
private AppendableIndexSpec appendableIndexSpec;
143+
private AggregatorFactory spectatorHistogramFactory;
144+
private DataGenerator generator;
145+
private QueryRunnerFactory factory;
146+
private GeneratorSchemaInfo schemaInfo;
147+
private TimeseriesQuery query;
148+
149+
/**
150+
* Setup everything common for benchmarking both the incremental-index and the queryable-index.
151+
*/
152+
@Setup
153+
public void setup()
154+
{
155+
log.info("SETUP CALLED AT " + System.currentTimeMillis());
156+
157+
schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
158+
159+
spectatorHistogramFactory = new SpectatorHistogramAggregatorFactory("spectatorHistogram", metricName);
160+
161+
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
162+
Collections.singletonList(schemaInfo.getDataInterval())
163+
);
164+
165+
generator = new DataGenerator(
166+
schemaInfo.getColumnSchemas(),
167+
RNG_SEED,
168+
schemaInfo.getDataInterval(),
169+
rowsPerSegment
170+
);
171+
172+
query = Druids.newTimeseriesQueryBuilder()
173+
.dataSource("blah")
174+
.granularity(Granularities.ALL)
175+
.intervals(intervalSpec)
176+
.aggregators(Collections.singletonList(spectatorHistogramFactory))
177+
.descending(false)
178+
.build();
179+
180+
factory = new TimeseriesQueryRunnerFactory(
181+
new TimeseriesQueryQueryToolChest(),
182+
new TimeseriesQueryEngine(),
183+
QueryBenchmarkUtil.NOOP_QUERYWATCHER
184+
);
185+
}
186+
187+
/**
188+
* Setup/teardown everything specific for benchmarking the incremental-index.
189+
*/
190+
@State(Scope.Benchmark)
191+
public static class IncrementalIndexState
192+
{
193+
IncrementalIndex incIndex;
194+
195+
@Setup(Level.Invocation)
196+
public void setup(SpectatorHistogramAggregatorBenchmark global)
197+
{
198+
global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
199+
incIndex = global.makeIncIndex(global.spectatorHistogramFactory);
200+
global.generator.addToIndex(incIndex, global.rowsPerSegment);
201+
}
202+
203+
@TearDown(Level.Invocation)
204+
public void tearDown()
205+
{
206+
if (incIndex != null) {
207+
incIndex.close();
208+
}
209+
}
210+
}
211+
212+
/**
213+
* Setup/teardown everything specific for benchmarking the queryable-index.
214+
*/
215+
@State(Scope.Benchmark)
216+
public static class QueryableIndexState
217+
{
218+
private File qIndexesDir;
219+
private QueryableIndex qIndex;
220+
221+
@Setup
222+
public void setup(SpectatorHistogramAggregatorBenchmark global) throws IOException
223+
{
224+
global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
225+
226+
IncrementalIndex incIndex = global.makeIncIndex(global.spectatorHistogramFactory);
227+
global.generator.addToIndex(incIndex, global.rowsPerSegment);
228+
229+
qIndexesDir = FileUtils.createTempDir();
230+
log.info("Using temp dir: " + qIndexesDir.getAbsolutePath());
231+
232+
File indexFile = INDEX_MERGER_V9.persist(
233+
incIndex,
234+
qIndexesDir,
235+
IndexSpec.getDefault(),
236+
null
237+
);
238+
incIndex.close();
239+
240+
qIndex = INDEX_IO.loadIndex(indexFile);
241+
}
242+
243+
@TearDown
244+
public void tearDown()
245+
{
246+
if (qIndex != null) {
247+
qIndex.close();
248+
}
249+
if (qIndexesDir != null) {
250+
qIndexesDir.delete();
251+
}
252+
}
253+
}
254+
255+
private IncrementalIndex makeIncIndex(AggregatorFactory metric)
256+
{
257+
return appendableIndexSpec.builder()
258+
.setSimpleTestingIndexSchema(metric)
259+
.setMaxRowCount(rowsPerSegment)
260+
.build();
261+
}
262+
263+
private static <T> List<T> runQuery(
264+
QueryRunnerFactory factory,
265+
QueryRunner runner,
266+
Query<T> query,
267+
String vectorize
268+
)
269+
{
270+
QueryToolChest toolChest = factory.getToolchest();
271+
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
272+
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
273+
toolChest
274+
);
275+
276+
final QueryPlus<T> queryToRun = QueryPlus.wrap(
277+
query.withOverriddenContext(
278+
ImmutableMap.of(
279+
QueryContexts.VECTORIZE_KEY, vectorize,
280+
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
281+
)
282+
)
283+
);
284+
Sequence<T> queryResult = theRunner.run(queryToRun, ResponseContext.createEmpty());
285+
return queryResult.toList();
286+
}
287+
288+
@Benchmark
289+
@BenchmarkMode(Mode.AverageTime)
290+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
291+
public void queryIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
292+
{
293+
QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
294+
factory,
295+
SegmentId.dummy("incIndex"),
296+
new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
297+
);
298+
299+
List<Result<TimeseriesResultValue>> results = runQuery(factory, runner, query, vectorize);
300+
blackhole.consume(results);
301+
}
302+
303+
@Benchmark
304+
@BenchmarkMode(Mode.AverageTime)
305+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
306+
public void queryQueryableIndex(Blackhole blackhole, QueryableIndexState state)
307+
{
308+
QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
309+
factory,
310+
SegmentId.dummy("qIndex"),
311+
new QueryableIndexSegment(state.qIndex, SegmentId.dummy("qIndex"))
312+
);
313+
314+
List<Result<TimeseriesResultValue>> results = runQuery(factory, runner, query, vectorize);
315+
blackhole.consume(results);
316+
}
317+
318+
public static void main(String[] args) throws RunnerException
319+
{
320+
Options opt = new OptionsBuilder()
321+
.include(SpectatorHistogramAggregatorBenchmark.class.getSimpleName())
322+
.forks(1)
323+
.warmupIterations(1)
324+
.measurementIterations(2)
325+
.build();
326+
new Runner(opt).run();
327+
}
328+
}
329+

extensions-contrib/spectator-histogram/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,5 +146,10 @@
146146
<artifactId>easymock</artifactId>
147147
<scope>test</scope>
148148
</dependency>
149+
<dependency>
150+
<groupId>org.mockito</groupId>
151+
<artifactId>mockito-core</artifactId>
152+
<scope>test</scope>
153+
</dependency>
149154
</dependencies>
150155
</project>

extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogram.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public class SpectatorHistogram extends Number
143143
// These are accumulated when an entry is added, or when another histogram is merged into this one.
144144
private long sumOfCounts = 0;
145145

146-
static int getMaxIntermdiateHistogramSize()
146+
public static int getMaxIntermediateHistogramSize()
147147
{
148148
return PercentileBuckets.length() * MAX_ENTRY_BYTES;
149149
}
@@ -276,7 +276,7 @@ byte[] toBytes()
276276
return Arrays.copyOf(buffer.array(), buffer.position());
277277
}
278278

279-
void insert(Number num)
279+
public void insert(Number num)
280280
{
281281
this.add(PercentileBuckets.indexOf(num.longValue()), 1L);
282282
}

0 commit comments

Comments
 (0)