mirror of https://github.com/apache/druid.git
Add benchmark for expressions. (#4366)
* Add benchmark for expressions. * Code review comments.
This commit is contained in:
parent
13ecf90923
commit
fd55c894ce
|
@ -78,6 +78,11 @@
|
||||||
<artifactId>json-flattener</artifactId>
|
<artifactId>json-flattener</artifactId>
|
||||||
<version>0.1.0</version>
|
<version>0.1.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.easymock</groupId>
|
||||||
|
<artifactId>easymock</artifactId>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
|
@ -87,8 +92,8 @@
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<jmh.version>1.17.2</jmh.version>
|
<jmh.version>1.19</jmh.version>
|
||||||
<javac.target>1.7</javac.target>
|
<javac.target>1.8</javac.target>
|
||||||
<uberjar.name>benchmarks</uberjar.name>
|
<uberjar.name>benchmarks</uberjar.name>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,244 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.benchmark;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import io.druid.benchmark.datagen.BenchmarkColumnSchema;
|
||||||
|
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||||
|
import io.druid.benchmark.datagen.SegmentGenerator;
|
||||||
|
import io.druid.java.util.common.granularity.Granularities;
|
||||||
|
import io.druid.java.util.common.guava.Sequence;
|
||||||
|
import io.druid.java.util.common.guava.Sequences;
|
||||||
|
import io.druid.js.JavaScriptConfig;
|
||||||
|
import io.druid.query.aggregation.BufferAggregator;
|
||||||
|
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||||
|
import io.druid.segment.ColumnSelectorFactory;
|
||||||
|
import io.druid.segment.Cursor;
|
||||||
|
import io.druid.segment.FloatColumnSelector;
|
||||||
|
import io.druid.segment.QueryableIndex;
|
||||||
|
import io.druid.segment.QueryableIndexStorageAdapter;
|
||||||
|
import io.druid.segment.VirtualColumns;
|
||||||
|
import io.druid.segment.column.ValueType;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import io.druid.timeline.partition.LinearShardSpec;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.openjdk.jmh.annotations.Benchmark;
|
||||||
|
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||||
|
import org.openjdk.jmh.annotations.Fork;
|
||||||
|
import org.openjdk.jmh.annotations.Level;
|
||||||
|
import org.openjdk.jmh.annotations.Measurement;
|
||||||
|
import org.openjdk.jmh.annotations.Mode;
|
||||||
|
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||||
|
import org.openjdk.jmh.annotations.Param;
|
||||||
|
import org.openjdk.jmh.annotations.Scope;
|
||||||
|
import org.openjdk.jmh.annotations.Setup;
|
||||||
|
import org.openjdk.jmh.annotations.State;
|
||||||
|
import org.openjdk.jmh.annotations.TearDown;
|
||||||
|
import org.openjdk.jmh.annotations.Warmup;
|
||||||
|
import org.openjdk.jmh.infra.Blackhole;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
@State(Scope.Benchmark)
|
||||||
|
@Fork(value = 1)
|
||||||
|
@Warmup(iterations = 15)
|
||||||
|
@Measurement(iterations = 30)
|
||||||
|
@BenchmarkMode(Mode.AverageTime)
|
||||||
|
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||||
|
public class ExpressionBenchmark
|
||||||
|
{
|
||||||
|
@Param({"1000000"})
|
||||||
|
private int rowsPerSegment;
|
||||||
|
|
||||||
|
private SegmentGenerator segmentGenerator;
|
||||||
|
private QueryableIndex index;
|
||||||
|
private JavaScriptAggregatorFactory javaScriptAggregatorFactory;
|
||||||
|
private DoubleSumAggregatorFactory expressionAggregatorFactory;
|
||||||
|
private ByteBuffer aggregationBuffer = ByteBuffer.allocate(Double.BYTES);
|
||||||
|
|
||||||
|
@Setup(Level.Trial)
|
||||||
|
public void setup() throws Exception
|
||||||
|
{
|
||||||
|
final BenchmarkSchemaInfo schemaInfo = new BenchmarkSchemaInfo(
|
||||||
|
ImmutableList.of(
|
||||||
|
BenchmarkColumnSchema.makeNormal("x", ValueType.FLOAT, false, 1, 0d, 0d, 10000d, false),
|
||||||
|
BenchmarkColumnSchema.makeNormal("y", ValueType.FLOAT, false, 1, 0d, 0d, 10000d, false)
|
||||||
|
),
|
||||||
|
ImmutableList.of(),
|
||||||
|
new Interval("2000/P1D"),
|
||||||
|
false
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment dataSegment = DataSegment.builder()
|
||||||
|
.dataSource("foo")
|
||||||
|
.interval(schemaInfo.getDataInterval())
|
||||||
|
.version("1")
|
||||||
|
.shardSpec(new LinearShardSpec(0))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
this.segmentGenerator = new SegmentGenerator();
|
||||||
|
this.index = segmentGenerator.generate(dataSegment, schemaInfo, rowsPerSegment);
|
||||||
|
this.javaScriptAggregatorFactory = new JavaScriptAggregatorFactory(
|
||||||
|
"name",
|
||||||
|
ImmutableList.of("x", "y"),
|
||||||
|
"function(current,x,y) { if (x > 0) { return current + x + 1 } else { return current + y + 1 } }",
|
||||||
|
"function() { return 0 }",
|
||||||
|
"function(a,b) { return a + b }",
|
||||||
|
JavaScriptConfig.getEnabledInstance()
|
||||||
|
);
|
||||||
|
this.expressionAggregatorFactory = new DoubleSumAggregatorFactory(
|
||||||
|
"name",
|
||||||
|
null,
|
||||||
|
"if(x>0,1.0+x,y+1)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@TearDown(Level.Trial)
|
||||||
|
public void tearDown() throws Exception
|
||||||
|
{
|
||||||
|
if (index != null) {
|
||||||
|
index.close();
|
||||||
|
index = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (segmentGenerator != null) {
|
||||||
|
segmentGenerator.close();
|
||||||
|
segmentGenerator = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public void queryUsingJavaScript(Blackhole blackhole) throws Exception
|
||||||
|
{
|
||||||
|
final Double result = compute(javaScriptAggregatorFactory::factorizeBuffered);
|
||||||
|
blackhole.consume(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public void queryUsingExpression(Blackhole blackhole) throws Exception
|
||||||
|
{
|
||||||
|
final Double result = compute(expressionAggregatorFactory::factorizeBuffered);
|
||||||
|
blackhole.consume(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public void queryUsingNative(Blackhole blackhole) throws Exception
|
||||||
|
{
|
||||||
|
final Double result = compute(
|
||||||
|
columnSelectorFactory ->
|
||||||
|
new NativeBufferAggregator(
|
||||||
|
columnSelectorFactory.makeFloatColumnSelector("x"),
|
||||||
|
columnSelectorFactory.makeFloatColumnSelector("y")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
blackhole.consume(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private double compute(final Function<ColumnSelectorFactory, BufferAggregator> aggregatorFactory)
|
||||||
|
{
|
||||||
|
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(index);
|
||||||
|
|
||||||
|
final Sequence<Cursor> cursors = adapter.makeCursors(
|
||||||
|
null,
|
||||||
|
index.getDataInterval(),
|
||||||
|
VirtualColumns.EMPTY,
|
||||||
|
Granularities.ALL,
|
||||||
|
false
|
||||||
|
);
|
||||||
|
|
||||||
|
final List<Double> results = Sequences.toList(
|
||||||
|
Sequences.map(
|
||||||
|
cursors,
|
||||||
|
cursor -> {
|
||||||
|
final BufferAggregator bufferAggregator = aggregatorFactory.apply(cursor);
|
||||||
|
bufferAggregator.init(aggregationBuffer, 0);
|
||||||
|
|
||||||
|
while (!cursor.isDone()) {
|
||||||
|
bufferAggregator.aggregate(aggregationBuffer, 0);
|
||||||
|
cursor.advance();
|
||||||
|
}
|
||||||
|
|
||||||
|
final Double dbl = (Double) bufferAggregator.get(aggregationBuffer, 0);
|
||||||
|
bufferAggregator.close();
|
||||||
|
return dbl;
|
||||||
|
}
|
||||||
|
),
|
||||||
|
new ArrayList<>()
|
||||||
|
);
|
||||||
|
|
||||||
|
return Iterables.getOnlyElement(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class NativeBufferAggregator implements BufferAggregator
|
||||||
|
{
|
||||||
|
private final FloatColumnSelector xSelector;
|
||||||
|
private final FloatColumnSelector ySelector;
|
||||||
|
|
||||||
|
public NativeBufferAggregator(final FloatColumnSelector xSelector, final FloatColumnSelector ySelector)
|
||||||
|
{
|
||||||
|
this.xSelector = xSelector;
|
||||||
|
this.ySelector = ySelector;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(final ByteBuffer buf, final int position)
|
||||||
|
{
|
||||||
|
buf.putDouble(0, 0d);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void aggregate(final ByteBuffer buf, final int position)
|
||||||
|
{
|
||||||
|
final float x = xSelector.get();
|
||||||
|
final double n = x > 0 ? x + 1 : ySelector.get() + 1;
|
||||||
|
buf.putDouble(0, buf.getDouble(position) + n);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object get(final ByteBuffer buf, final int position)
|
||||||
|
{
|
||||||
|
return buf.getDouble(position);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getFloat(final ByteBuffer buf, final int position)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLong(final ByteBuffer buf, final int position)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -100,7 +100,7 @@ import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class FilterPartitionBenchmark
|
public class FilterPartitionBenchmark
|
||||||
|
|
|
@ -100,7 +100,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class FilteredAggregatorBenchmark
|
public class FilteredAggregatorBenchmark
|
||||||
|
|
|
@ -106,7 +106,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
// Benchmark for determining the interface overhead of GroupBy with multiple type implementations
|
// Benchmark for determining the interface overhead of GroupBy with multiple type implementations
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 15)
|
@Warmup(iterations = 15)
|
||||||
@Measurement(iterations = 30)
|
@Measurement(iterations = 30)
|
||||||
public class GroupByTypeInterfaceBenchmark
|
public class GroupByTypeInterfaceBenchmark
|
||||||
|
|
|
@ -99,7 +99,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
// Benchmark for determining the interface overhead of TopN with multiple type implementations
|
// Benchmark for determining the interface overhead of TopN with multiple type implementations
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class TopNTypeInterfaceBenchmark
|
public class TopNTypeInterfaceBenchmark
|
||||||
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.benchmark.datagen;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
import io.druid.data.input.InputRow;
|
||||||
|
import io.druid.data.input.impl.DimensionSchema;
|
||||||
|
import io.druid.data.input.impl.DimensionsSpec;
|
||||||
|
import io.druid.data.input.impl.FloatDimensionSchema;
|
||||||
|
import io.druid.data.input.impl.LongDimensionSchema;
|
||||||
|
import io.druid.data.input.impl.StringDimensionSchema;
|
||||||
|
import io.druid.hll.HyperLogLogHash;
|
||||||
|
import io.druid.java.util.common.ISE;
|
||||||
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||||
|
import io.druid.segment.IndexBuilder;
|
||||||
|
import io.druid.segment.IndexSpec;
|
||||||
|
import io.druid.segment.QueryableIndex;
|
||||||
|
import io.druid.segment.QueryableIndexIndexableAdapter;
|
||||||
|
import io.druid.segment.TestHelper;
|
||||||
|
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||||
|
import io.druid.segment.serde.ComplexMetrics;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class SegmentGenerator implements Closeable
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(SegmentGenerator.class);
|
||||||
|
|
||||||
|
private static final int MAX_ROWS_IN_MEMORY = 200000;
|
||||||
|
private static final int STARTING_SEED = 9999; // Consistent seed for reproducibility
|
||||||
|
|
||||||
|
private final File tempDir;
|
||||||
|
private final AtomicInteger seed;
|
||||||
|
|
||||||
|
public SegmentGenerator()
|
||||||
|
{
|
||||||
|
this.tempDir = Files.createTempDir();
|
||||||
|
this.seed = new AtomicInteger(STARTING_SEED);
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueryableIndex generate(
|
||||||
|
final DataSegment dataSegment,
|
||||||
|
final BenchmarkSchemaInfo schemaInfo,
|
||||||
|
final int numRows
|
||||||
|
)
|
||||||
|
{
|
||||||
|
// In case we need to generate hyperUniques.
|
||||||
|
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||||
|
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||||
|
}
|
||||||
|
|
||||||
|
final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(
|
||||||
|
schemaInfo.getColumnSchemas(),
|
||||||
|
seed.getAndIncrement(),
|
||||||
|
schemaInfo.getDataInterval(),
|
||||||
|
numRows
|
||||||
|
);
|
||||||
|
|
||||||
|
final List<DimensionSchema> dimensions = new ArrayList<>();
|
||||||
|
for (BenchmarkColumnSchema columnSchema : schemaInfo.getColumnSchemas()) {
|
||||||
|
if (schemaInfo.getAggs().stream().noneMatch(agg -> agg.getName().equals(columnSchema.getName()))) {
|
||||||
|
switch (columnSchema.getType()) {
|
||||||
|
case STRING:
|
||||||
|
dimensions.add(new StringDimensionSchema(columnSchema.getName()));
|
||||||
|
break;
|
||||||
|
case LONG:
|
||||||
|
dimensions.add(new LongDimensionSchema(columnSchema.getName()));
|
||||||
|
break;
|
||||||
|
case FLOAT:
|
||||||
|
dimensions.add(new FloatDimensionSchema(columnSchema.getName()));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new ISE("Unhandleable type[%s]", columnSchema.getType());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
|
||||||
|
.withDimensionsSpec(new DimensionsSpec(dimensions, ImmutableList.of(), ImmutableList.of()))
|
||||||
|
.withMetrics(schemaInfo.getAggsArray())
|
||||||
|
.withRollup(schemaInfo.isWithRollup())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final List<InputRow> rows = new ArrayList<>();
|
||||||
|
final List<QueryableIndex> indexes = new ArrayList<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < numRows; i++) {
|
||||||
|
final InputRow row = dataGenerator.nextRow();
|
||||||
|
rows.add(row);
|
||||||
|
|
||||||
|
if ((i + 1) % 20000 == 0) {
|
||||||
|
log.info("%,d/%,d rows generated.", i + 1, numRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rows.size() % MAX_ROWS_IN_MEMORY == 0) {
|
||||||
|
indexes.add(makeIndex(dataSegment.getIdentifier(), indexes.size(), rows, indexSchema));
|
||||||
|
rows.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("%,d/%,d rows generated.", numRows, numRows);
|
||||||
|
|
||||||
|
if (rows.size() > 0) {
|
||||||
|
indexes.add(makeIndex(dataSegment.getIdentifier(), indexes.size(), rows, indexSchema));
|
||||||
|
rows.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (indexes.isEmpty()) {
|
||||||
|
throw new ISE("No rows to index?");
|
||||||
|
} else if (indexes.size() == 1) {
|
||||||
|
return Iterables.getOnlyElement(indexes);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
final QueryableIndex merged = TestHelper.getTestIndexIO().loadIndex(
|
||||||
|
TestHelper.getTestIndexMergerV9().merge(
|
||||||
|
indexes.stream().map(QueryableIndexIndexableAdapter::new).collect(Collectors.toList()),
|
||||||
|
false,
|
||||||
|
schemaInfo.getAggs()
|
||||||
|
.stream()
|
||||||
|
.map(AggregatorFactory::getCombiningFactory)
|
||||||
|
.toArray(AggregatorFactory[]::new),
|
||||||
|
new File(tempDir, "merged"),
|
||||||
|
new IndexSpec()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
for (QueryableIndex index : indexes) {
|
||||||
|
index.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return merged;
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
FileUtils.deleteDirectory(tempDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
private QueryableIndex makeIndex(
|
||||||
|
final String identifier,
|
||||||
|
final int indexNumber,
|
||||||
|
final List<InputRow> rows,
|
||||||
|
final IncrementalIndexSchema indexSchema
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return IndexBuilder
|
||||||
|
.create()
|
||||||
|
.schema(indexSchema)
|
||||||
|
.tmpDir(new File(new File(tempDir, identifier), String.valueOf(indexNumber)))
|
||||||
|
.indexMerger(TestHelper.getTestIndexMergerV9())
|
||||||
|
.rows(rows)
|
||||||
|
.buildMMappedIndex();
|
||||||
|
}
|
||||||
|
}
|
|
@ -72,7 +72,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class IncrementalIndexReadBenchmark
|
public class IncrementalIndexReadBenchmark
|
||||||
|
|
|
@ -51,7 +51,7 @@ import java.util.ArrayList;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class IndexIngestionBenchmark
|
public class IndexIngestionBenchmark
|
||||||
|
|
|
@ -63,7 +63,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class IndexMergeBenchmark
|
public class IndexMergeBenchmark
|
||||||
|
|
|
@ -62,7 +62,7 @@ import java.util.ArrayList;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class IndexPersistBenchmark
|
public class IndexPersistBenchmark
|
||||||
|
|
|
@ -106,7 +106,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 15)
|
@Warmup(iterations = 15)
|
||||||
@Measurement(iterations = 30)
|
@Measurement(iterations = 30)
|
||||||
public class GroupByBenchmark
|
public class GroupByBenchmark
|
||||||
|
|
|
@ -105,7 +105,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class SearchBenchmark
|
public class SearchBenchmark
|
||||||
|
|
|
@ -97,7 +97,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class SelectBenchmark
|
public class SelectBenchmark
|
||||||
|
|
|
@ -23,13 +23,11 @@ import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
|
||||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||||
import io.druid.benchmark.datagen.BenchmarkSchemas;
|
import io.druid.benchmark.datagen.BenchmarkSchemas;
|
||||||
|
import io.druid.benchmark.datagen.SegmentGenerator;
|
||||||
import io.druid.common.utils.JodaUtils;
|
import io.druid.common.utils.JodaUtils;
|
||||||
import io.druid.data.input.InputRow;
|
|
||||||
import io.druid.data.input.Row;
|
import io.druid.data.input.Row;
|
||||||
import io.druid.hll.HyperLogLogHash;
|
|
||||||
import io.druid.java.util.common.granularity.Granularities;
|
import io.druid.java.util.common.granularity.Granularities;
|
||||||
import io.druid.java.util.common.guava.Sequence;
|
import io.druid.java.util.common.guava.Sequence;
|
||||||
import io.druid.java.util.common.guava.Sequences;
|
import io.druid.java.util.common.guava.Sequences;
|
||||||
|
@ -38,15 +36,11 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
import io.druid.query.TableDataSource;
|
import io.druid.query.TableDataSource;
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
|
||||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||||
import io.druid.query.dimension.DimensionSpec;
|
import io.druid.query.dimension.DimensionSpec;
|
||||||
import io.druid.query.groupby.GroupByQuery;
|
import io.druid.query.groupby.GroupByQuery;
|
||||||
import io.druid.segment.IndexBuilder;
|
|
||||||
import io.druid.segment.QueryableIndex;
|
import io.druid.segment.QueryableIndex;
|
||||||
import io.druid.segment.TestHelper;
|
|
||||||
import io.druid.segment.column.ValueType;
|
import io.druid.segment.column.ValueType;
|
||||||
import io.druid.segment.serde.ComplexMetrics;
|
|
||||||
import io.druid.server.initialization.ServerConfig;
|
import io.druid.server.initialization.ServerConfig;
|
||||||
import io.druid.sql.calcite.planner.Calcites;
|
import io.druid.sql.calcite.planner.Calcites;
|
||||||
import io.druid.sql.calcite.planner.DruidPlanner;
|
import io.druid.sql.calcite.planner.DruidPlanner;
|
||||||
|
@ -82,7 +76,6 @@ import org.openjdk.jmh.infra.Blackhole;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -90,18 +83,19 @@ import java.util.concurrent.TimeUnit;
|
||||||
* Benchmark that compares the same groupBy query through the native query layer and through the SQL layer.
|
* Benchmark that compares the same groupBy query through the native query layer and through the SQL layer.
|
||||||
*/
|
*/
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 15)
|
@Warmup(iterations = 15)
|
||||||
@Measurement(iterations = 30)
|
@Measurement(iterations = 30)
|
||||||
public class SqlBenchmark
|
public class SqlBenchmark
|
||||||
{
|
{
|
||||||
@Param({"10000", "100000", "200000"})
|
@Param({"200000", "1000000"})
|
||||||
private int rowsPerSegment;
|
private int rowsPerSegment;
|
||||||
|
|
||||||
private static final Logger log = new Logger(SqlBenchmark.class);
|
private static final Logger log = new Logger(SqlBenchmark.class);
|
||||||
private static final int RNG_SEED = 9999;
|
private static final int RNG_SEED = 9999;
|
||||||
|
|
||||||
private File tmpDir;
|
private File tmpDir;
|
||||||
|
private SegmentGenerator segmentGenerator;
|
||||||
private SpecificSegmentsQuerySegmentWalker walker;
|
private SpecificSegmentsQuerySegmentWalker walker;
|
||||||
private PlannerFactory plannerFactory;
|
private PlannerFactory plannerFactory;
|
||||||
private GroupByQuery groupByQuery;
|
private GroupByQuery groupByQuery;
|
||||||
|
@ -113,46 +107,22 @@ public class SqlBenchmark
|
||||||
tmpDir = Files.createTempDir();
|
tmpDir = Files.createTempDir();
|
||||||
log.info("Starting benchmark setup using tmpDir[%s], rows[%,d].", tmpDir, rowsPerSegment);
|
log.info("Starting benchmark setup using tmpDir[%s], rows[%,d].", tmpDir, rowsPerSegment);
|
||||||
|
|
||||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
|
||||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
|
||||||
}
|
|
||||||
|
|
||||||
final BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get("basic");
|
final BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get("basic");
|
||||||
final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(
|
|
||||||
schemaInfo.getColumnSchemas(),
|
|
||||||
RNG_SEED + 1,
|
|
||||||
schemaInfo.getDataInterval(),
|
|
||||||
rowsPerSegment
|
|
||||||
);
|
|
||||||
|
|
||||||
final List<InputRow> rows = Lists.newArrayList();
|
final DataSegment dataSegment = DataSegment.builder()
|
||||||
for (int i = 0; i < rowsPerSegment; i++) {
|
.dataSource("foo")
|
||||||
final InputRow row = dataGenerator.nextRow();
|
.interval(schemaInfo.getDataInterval())
|
||||||
if (i % 20000 == 0) {
|
.version("1")
|
||||||
log.info("%,d/%,d rows generated.", i, rowsPerSegment);
|
.shardSpec(new LinearShardSpec(0))
|
||||||
}
|
.build();
|
||||||
rows.add(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("%,d/%,d rows generated.", rows.size(), rowsPerSegment);
|
this.segmentGenerator = new SegmentGenerator();
|
||||||
|
|
||||||
final PlannerConfig plannerConfig = new PlannerConfig();
|
final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, rowsPerSegment);
|
||||||
final QueryRunnerFactoryConglomerate conglomerate = CalciteTests.queryRunnerFactoryConglomerate();
|
final QueryRunnerFactoryConglomerate conglomerate = CalciteTests.queryRunnerFactoryConglomerate();
|
||||||
final QueryableIndex index = IndexBuilder.create()
|
final PlannerConfig plannerConfig = new PlannerConfig();
|
||||||
.tmpDir(new File(tmpDir, "1"))
|
|
||||||
.indexMerger(TestHelper.getTestIndexMergerV9())
|
|
||||||
.rows(rows)
|
|
||||||
.buildMMappedIndex();
|
|
||||||
|
|
||||||
this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
|
this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index);
|
||||||
DataSegment.builder()
|
|
||||||
.dataSource("foo")
|
|
||||||
.interval(index.getDataInterval())
|
|
||||||
.version("1")
|
|
||||||
.shardSpec(new LinearShardSpec(0))
|
|
||||||
.build(),
|
|
||||||
index
|
|
||||||
);
|
|
||||||
|
|
||||||
final Map<String, Table> tableMap = ImmutableMap.<String, Table>of(
|
final Map<String, Table> tableMap = ImmutableMap.<String, Table>of(
|
||||||
"foo",
|
"foo",
|
||||||
|
@ -211,6 +181,11 @@ public class SqlBenchmark
|
||||||
walker = null;
|
walker = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (segmentGenerator != null) {
|
||||||
|
segmentGenerator.close();
|
||||||
|
segmentGenerator = null;
|
||||||
|
}
|
||||||
|
|
||||||
if (tmpDir != null) {
|
if (tmpDir != null) {
|
||||||
FileUtils.deleteDirectory(tmpDir);
|
FileUtils.deleteDirectory(tmpDir);
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class TimeseriesBenchmark
|
public class TimeseriesBenchmark
|
||||||
|
|
|
@ -97,7 +97,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
@Fork(value = 1)
|
||||||
@Warmup(iterations = 10)
|
@Warmup(iterations = 10)
|
||||||
@Measurement(iterations = 25)
|
@Measurement(iterations = 25)
|
||||||
public class TopNBenchmark
|
public class TopNBenchmark
|
||||||
|
|
Loading…
Reference in New Issue