Add benchmark data generator, basic ingestion/persist/merge/query benchmarks (#2875)

This commit is contained in:
Jonathan Wei 2016-05-25 16:39:37 -07:00 committed by Fangjin Yang
parent 7e67397b5a
commit b72c54c4f8
21 changed files with 4174 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularities; import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -47,7 +48,7 @@ import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark) @State(Scope.Benchmark)
public class IncrementalIndexAddRowsBenchmark public class IncrementalIndexRowTypeBenchmark
{ {
private IncrementalIndex incIndex; private IncrementalIndex incIndex;
private IncrementalIndex incFloatIndex; private IncrementalIndex incFloatIndex;

View File

@ -0,0 +1,429 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.datagen;
import io.druid.segment.column.ValueType;
import java.util.List;
public class BenchmarkColumnSchema
{
/**
* SEQUENTIAL: Generate integer or enumerated values in sequence. Not random.
*
* DISCRETE_UNIFORM: Discrete uniform distribution, generates integers or enumerated values.
*
* ROUNDED_NORMAL: Discrete distribution that rounds sample values from an underlying normal
* distribution
*
* ZIPF: Discrete Zipf distribution.
* Lower numbers have higher probability.
* Can also generate Zipf distribution from a list of enumerated values.
*
* ENUMERATED: Discrete distribution, generated from lists of values and associated probabilities.
*
* NORMAL: Continuous normal distribution.
*
* UNIFORM: Continuous uniform distribution.
*/
public enum ValueDistribution
{
// discrete distributions
SEQUENTIAL,
DISCRETE_UNIFORM,
ROUNDED_NORMAL,
ZIPF,
ENUMERATED,
// continuous distributions
UNIFORM,
NORMAL
}
/**
* Generate values according to this distribution.
*/
private ValueDistribution distributionType;
/**
* Name of the column.
*/
private String name;
/**
* Value type of this column.
*/
private ValueType type;
/**
* Is this column a metric or dimension?
*/
private boolean isMetric;
/**
* Controls how many values are generated per row (use > 1 for multi-value dimensions)
*/
private int rowSize;
/**
* Probability that a null row will be generated instead of a row with values sampled from the distribution.
*/
private final Double nullProbability;
/**
* When used in discrete distributions, the set of possible values to be generated.
*/
private List<Object> enumeratedValues;
/**
* When using ENUMERATED distribution, the probabilities associated with the set of values to be generated.
* The probabilities in this list must follow the same order as those in enumeratedValues.
* Probabilities do not need to sum to 1.0, they will be automatically normalized.
*/
private List<Double> enumeratedProbabilities;
/**
* Range of integer values to generate in ZIPF and DISCRETE_NORMAL distributions.
*/
private Integer startInt;
private Integer endInt;
/**
* Range of double values to generate in NORMAL distribution.
*/
private Double startDouble;
private Double endDouble;
/**
* Exponent for the ZIPF distribution.
*/
private Double zipfExponent;
/**
* Mean and standard deviation for the NORMAL and ROUNDED_NORMAL distributions.
*/
private Double mean;
private Double standardDeviation;
private BenchmarkColumnSchema(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
ValueDistribution distributionType
)
{
this.name = name;
this.type = type;
this.isMetric = isMetric;
this.distributionType = distributionType;
this.rowSize = rowSize;
this.nullProbability = nullProbability;
}
public BenchmarkColumnValueGenerator makeGenerator(long seed)
{
return new BenchmarkColumnValueGenerator(this, seed);
}
public String getName()
{
return name;
}
public Double getNullProbability()
{
return nullProbability;
}
public ValueType getType()
{
return type;
}
public boolean isMetric()
{
return isMetric;
}
public ValueDistribution getDistributionType()
{
return distributionType;
}
public int getRowSize()
{
return rowSize;
}
public List<Object> getEnumeratedValues()
{
return enumeratedValues;
}
public List<Double> getEnumeratedProbabilities()
{
return enumeratedProbabilities;
}
public Integer getStartInt()
{
return startInt;
}
public Integer getEndInt()
{
return endInt;
}
public Double getStartDouble()
{
return startDouble;
}
public Double getEndDouble()
{
return endDouble;
}
public Double getZipfExponent()
{
return zipfExponent;
}
public Double getMean()
{
return mean;
}
public Double getStandardDeviation()
{
return standardDeviation;
}
public static BenchmarkColumnSchema makeSequential(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
int startInt,
int endInt
)
{
BenchmarkColumnSchema schema = new BenchmarkColumnSchema(
name,
type,
isMetric,
rowSize,
nullProbability,
ValueDistribution.SEQUENTIAL
);
schema.startInt = startInt;
schema.endInt = endInt;
return schema;
};
public static BenchmarkColumnSchema makeEnumeratedSequential(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
List<Object> enumeratedValues
)
{
BenchmarkColumnSchema schema = new BenchmarkColumnSchema(
name,
type,
isMetric,
rowSize,
nullProbability,
ValueDistribution.SEQUENTIAL
);
schema.enumeratedValues = enumeratedValues;
return schema;
};
public static BenchmarkColumnSchema makeDiscreteUniform(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
int startInt,
int endInt
)
{
BenchmarkColumnSchema schema = new BenchmarkColumnSchema(
name,
type,
isMetric,
rowSize,
nullProbability,
ValueDistribution.DISCRETE_UNIFORM
);
schema.startInt = startInt;
schema.endInt = endInt;
return schema;
};
public static BenchmarkColumnSchema makeEnumeratedDiscreteUniform(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
List<Object> enumeratedValues
)
{
BenchmarkColumnSchema schema = new BenchmarkColumnSchema(
name,
type,
isMetric,
rowSize,
nullProbability,
ValueDistribution.DISCRETE_UNIFORM
);
schema.enumeratedValues = enumeratedValues;
return schema;
};
public static BenchmarkColumnSchema makeContinuousUniform(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
double startDouble,
double endDouble
)
{
BenchmarkColumnSchema schema = new BenchmarkColumnSchema(
name,
type,
isMetric,
rowSize,
nullProbability,
ValueDistribution.UNIFORM
);
schema.startDouble = startDouble;
schema.endDouble = endDouble;
return schema;
};
public static BenchmarkColumnSchema makeNormal(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
Double mean,
Double standardDeviation,
boolean useRounding
)
{
BenchmarkColumnSchema schema = new BenchmarkColumnSchema(
name,
type,
isMetric,
rowSize,
nullProbability,
useRounding ? ValueDistribution.ROUNDED_NORMAL : ValueDistribution.NORMAL
);
schema.mean = mean;
schema.standardDeviation = standardDeviation;
return schema;
};
public static BenchmarkColumnSchema makeZipf(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
int startInt,
int endInt,
Double zipfExponent
)
{
BenchmarkColumnSchema schema = new BenchmarkColumnSchema(
name,
type,
isMetric,
rowSize,
nullProbability,
ValueDistribution.ZIPF
);
schema.startInt = startInt;
schema.endInt = endInt;
schema.zipfExponent = zipfExponent;
return schema;
};
public static BenchmarkColumnSchema makeEnumeratedZipf(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
List<Object> enumeratedValues,
Double zipfExponent
)
{
BenchmarkColumnSchema schema = new BenchmarkColumnSchema(
name,
type,
isMetric,
rowSize,
nullProbability,
ValueDistribution.ZIPF
);
schema.enumeratedValues = enumeratedValues;
schema.zipfExponent = zipfExponent;
return schema;
};
public static BenchmarkColumnSchema makeEnumerated(
String name,
ValueType type,
boolean isMetric,
int rowSize,
Double nullProbability,
List<Object> enumeratedValues,
List<Double> enumeratedProbabilities
)
{
BenchmarkColumnSchema schema = new BenchmarkColumnSchema(
name,
type,
isMetric,
rowSize,
nullProbability,
ValueDistribution.ENUMERATED
);
schema.enumeratedValues = enumeratedValues;
schema.enumeratedProbabilities = enumeratedProbabilities;
return schema;
};
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.datagen;
import io.druid.segment.column.ValueType;
import org.apache.commons.math3.distribution.AbstractIntegerDistribution;
import org.apache.commons.math3.distribution.AbstractRealDistribution;
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.distribution.NormalDistribution;
import org.apache.commons.math3.distribution.UniformRealDistribution;
import org.apache.commons.math3.distribution.ZipfDistribution;
import org.apache.commons.math3.util.Pair;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BenchmarkColumnValueGenerator
{
private final BenchmarkColumnSchema schema;
private final long seed;
private Serializable distribution;
private Random simpleRng;
public BenchmarkColumnValueGenerator(
BenchmarkColumnSchema schema,
long seed
)
{
this.schema = schema;
this.seed = seed;
simpleRng = new Random(seed);
initDistribution();
}
public Object generateRowValue()
{
Double nullProbability = schema.getNullProbability();
int rowSize = schema.getRowSize();
if (nullProbability != null) {
Double randDouble = simpleRng.nextDouble();
if (randDouble <= nullProbability) {
return null;
}
}
if (rowSize == 1) {
return generateSingleRowValue();
} else {
List<Object> rowVals = new ArrayList<>(rowSize);
for (int i = 0; i < rowSize; i++) {
rowVals.add(generateSingleRowValue());
}
return rowVals;
}
}
public BenchmarkColumnSchema getSchema()
{
return schema;
}
public long getSeed()
{
return seed;
}
private Object generateSingleRowValue()
{
Object ret = null;
ValueType type = schema.getType();
if (distribution instanceof AbstractIntegerDistribution) {
ret = ((AbstractIntegerDistribution) distribution).sample();
} else if (distribution instanceof AbstractRealDistribution) {
ret = ((AbstractRealDistribution) distribution).sample();
} else if (distribution instanceof EnumeratedDistribution) {
ret = ((EnumeratedDistribution) distribution).sample();
}
ret = convertType(ret, type);
return ret;
}
private Object convertType(Object input, ValueType type)
{
if (input == null) {
return null;
}
Object ret;
switch (type) {
case STRING:
ret = input.toString();
break;
case LONG:
if (input instanceof Number) {
ret = ((Number) input).longValue();
} else {
ret = Long.parseLong(input.toString());
}
break;
case FLOAT:
if (input instanceof Number) {
ret = ((Number) input).floatValue();
} else {
ret = Float.parseFloat(input.toString());
}
break;
default:
throw new UnsupportedOperationException("Unknown data type: " + type);
}
return ret;
}
private void initDistribution()
{
BenchmarkColumnSchema.ValueDistribution distributionType = schema.getDistributionType();
ValueType type = schema.getType();
List<Object> enumeratedValues = schema.getEnumeratedValues();
List<Double> enumeratedProbabilities = schema.getEnumeratedProbabilities();
List<Pair<Object, Double>> probabilities = new ArrayList<>();
switch (distributionType) {
case SEQUENTIAL:
// not random, just cycle through numbers from start to end, or cycle through enumerated values if provided
distribution = new SequentialDistribution(
schema.getStartInt(),
schema.getEndInt(),
schema.getEnumeratedValues()
);
break;
case UNIFORM:
distribution = new UniformRealDistribution(schema.getStartDouble(), schema.getEndDouble());
break;
case DISCRETE_UNIFORM:
if (enumeratedValues == null) {
enumeratedValues = new ArrayList<>();
for (int i = schema.getStartInt(); i < schema.getEndInt(); i++) {
Object val = convertType(i, type);
enumeratedValues.add(val);
}
}
// give them all equal probability, the library will normalize probabilities to sum to 1.0
for (int i = 0; i < enumeratedValues.size(); i++) {
probabilities.add(new Pair<>(enumeratedValues.get(i), 0.1));
}
distribution = new EnumeratedTreeDistribution<>(probabilities);
break;
case NORMAL:
distribution = new NormalDistribution(schema.getMean(), schema.getStandardDeviation());
break;
case ROUNDED_NORMAL:
NormalDistribution normalDist = new NormalDistribution(schema.getMean(), schema.getStandardDeviation());
distribution = new RealRoundingDistribution(normalDist);
break;
case ZIPF:
int cardinality;
if (enumeratedValues == null) {
Integer startInt = schema.getStartInt();
cardinality = schema.getEndInt() - startInt;
ZipfDistribution zipf = new ZipfDistribution(cardinality, schema.getZipfExponent());
for (int i = 0; i < cardinality; i++) {
probabilities.add(new Pair<>((Object) (i + startInt), zipf.probability(i)));
}
} else {
cardinality = enumeratedValues.size();
ZipfDistribution zipf = new ZipfDistribution(enumeratedValues.size(), schema.getZipfExponent());
for (int i = 0; i < cardinality; i++) {
probabilities.add(new Pair<>(enumeratedValues.get(i), zipf.probability(i)));
}
}
distribution = new EnumeratedTreeDistribution<>(probabilities);
break;
case ENUMERATED:
for (int i = 0; i < enumeratedValues.size(); i++) {
probabilities.add(new Pair<>(enumeratedValues.get(i), enumeratedProbabilities.get(i)));
}
distribution = new EnumeratedTreeDistribution<>(probabilities);
break;
default:
throw new UnsupportedOperationException("Unknown distribution type: " + distributionType);
}
if (distribution instanceof AbstractIntegerDistribution) {
((AbstractIntegerDistribution) distribution).reseedRandomGenerator(seed);
} else if (distribution instanceof AbstractRealDistribution) {
((AbstractRealDistribution) distribution).reseedRandomGenerator(seed);
} else if (distribution instanceof EnumeratedDistribution) {
((EnumeratedDistribution) distribution).reseedRandomGenerator(seed);
}
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.datagen;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BenchmarkDataGenerator
{
private final List<BenchmarkColumnSchema> columnSchemas;
private final long seed;
private List<BenchmarkColumnValueGenerator> columnGenerators;
private final long startTime;
private final long endTime;
private final int numConsecutiveTimestamps;
private final double timestampIncrement;
private double currentTime;
private int timeCounter;
private List<String> dimensionNames;
public BenchmarkDataGenerator(
List<BenchmarkColumnSchema> columnSchemas,
final long seed,
long startTime,
int numConsecutiveTimestamps,
Double timestampIncrement
)
{
this.columnSchemas = columnSchemas;
this.seed = seed;
this.startTime = startTime;
this.endTime = Long.MAX_VALUE;
this.numConsecutiveTimestamps = numConsecutiveTimestamps;
this.timestampIncrement = timestampIncrement;
this.currentTime = startTime;
init();
}
public BenchmarkDataGenerator(
List<BenchmarkColumnSchema> columnSchemas,
final long seed,
Interval interval,
int numRows
)
{
this.columnSchemas = columnSchemas;
this.seed = seed;
this.startTime = interval.getStartMillis();
this.endTime = interval.getEndMillis();
long timeDelta = endTime - startTime;
this.timestampIncrement = timeDelta / (numRows * 1.0);
this.numConsecutiveTimestamps = 0;
init();
}
public InputRow nextRow()
{
Map<String, Object> event = new HashMap<>();
for (BenchmarkColumnValueGenerator generator : columnGenerators) {
event.put(generator.getSchema().getName(), generator.generateRowValue());
}
MapBasedInputRow row = new MapBasedInputRow(nextTimestamp(), dimensionNames, event);
return row;
}
private void init()
{
this.timeCounter = 0;
this.currentTime = startTime;
dimensionNames = new ArrayList<>();
for (BenchmarkColumnSchema schema: columnSchemas) {
if (!schema.isMetric()) {
dimensionNames.add(schema.getName());
}
}
columnGenerators = new ArrayList<>();
columnGenerators.addAll(
Lists.transform(
columnSchemas,
new Function<BenchmarkColumnSchema, BenchmarkColumnValueGenerator>()
{
@Override
public BenchmarkColumnValueGenerator apply(
BenchmarkColumnSchema input
)
{
return input.makeGenerator(seed);
}
}
)
);
}
private long nextTimestamp()
{
timeCounter += 1;
if (timeCounter > numConsecutiveTimestamps) {
currentTime += timestampIncrement;
timeCounter = 0;
}
long newMillis = Math.round(currentTime);
if (newMillis > endTime) {
return endTime;
} else {
return newMillis;
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.datagen;
import io.druid.benchmark.datagen.BenchmarkColumnSchema;
import io.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Interval;
import java.util.List;
public class BenchmarkSchemaInfo
{
private List<BenchmarkColumnSchema> columnSchemas;
private List<AggregatorFactory> aggs;
private Interval dataInterval;
public BenchmarkSchemaInfo (
List<BenchmarkColumnSchema> columnSchemas,
List<AggregatorFactory> aggs,
Interval dataInterval
)
{
this.columnSchemas = columnSchemas;
this.aggs = aggs;
this.dataInterval = dataInterval;
}
public List<BenchmarkColumnSchema> getColumnSchemas()
{
return columnSchemas;
}
public List<AggregatorFactory> getAggs()
{
return aggs;
}
public AggregatorFactory[] getAggsArray()
{
return aggs.toArray(new AggregatorFactory[0]);
}
public Interval getDataInterval()
{
return dataInterval;
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.datagen;
import com.google.common.collect.ImmutableList;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.column.ValueType;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class BenchmarkSchemas
{
public static final Map<String, BenchmarkSchemaInfo> SCHEMA_MAP = new LinkedHashMap<>();
static { // basic schema
List<BenchmarkColumnSchema> basicSchemaColumns = ImmutableList.of(
// dims
BenchmarkColumnSchema.makeSequential("dimSequential", ValueType.STRING, false, 1, null, 0, 1000),
BenchmarkColumnSchema.makeZipf("dimZipf", ValueType.STRING, false, 1, null, 1, 101, 1.0),
BenchmarkColumnSchema.makeDiscreteUniform("dimUniform", ValueType.STRING, false, 1, null, 1, 1000000),
BenchmarkColumnSchema.makeSequential("dimSequentialHalfNull", ValueType.STRING, false, 1, 0.5, 0, 1000),
BenchmarkColumnSchema.makeEnumerated(
"dimMultivalEnumerated",
ValueType.STRING,
false,
4,
null,
Arrays.<Object>asList("Hello", "World", "Foo", "Bar", "Baz"),
Arrays.<Double>asList(0.2, 0.25, 0.15, 0.10, 0.3)
),
BenchmarkColumnSchema.makeEnumerated(
"dimMultivalEnumerated2",
ValueType.STRING,
false,
3,
null,
Arrays.<Object>asList("Apple", "Orange", "Xylophone", "Corundum", null),
Arrays.<Double>asList(0.2, 0.25, 0.15, 0.10, 0.3)
),
BenchmarkColumnSchema.makeSequential("dimMultivalSequentialWithNulls", ValueType.STRING, false, 8, 0.15, 1, 11),
BenchmarkColumnSchema.makeSequential("dimHyperUnique", ValueType.STRING, false, 1, null, 0, 100000),
BenchmarkColumnSchema.makeSequential("dimNull", ValueType.STRING, false, 1, 1.0, 0, 1),
// metrics
BenchmarkColumnSchema.makeSequential("metLongSequential", ValueType.LONG, true, 1, null, 0, 10000),
BenchmarkColumnSchema.makeDiscreteUniform("metLongUniform", ValueType.LONG, true, 1, null, 0, 500),
BenchmarkColumnSchema.makeNormal("metFloatNormal", ValueType.FLOAT, true, 1, null, 5000.0, 1.0, true),
BenchmarkColumnSchema.makeZipf("metFloatZipf", ValueType.FLOAT, true, 1, null, 0, 1000, 1.0)
);
List<AggregatorFactory> basicSchemaIngestAggs = new ArrayList<>();
basicSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
basicSchemaIngestAggs.add(new LongSumAggregatorFactory("sumLongSequential", "metLongSequential"));
basicSchemaIngestAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
basicSchemaIngestAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "metFloatNormal"));
basicSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
basicSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
Interval basicSchemaDataInterval = new Interval(0, 1000000);
BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo(
basicSchemaColumns,
basicSchemaIngestAggs,
basicSchemaDataInterval
);
SCHEMA_MAP.put("basic", basicSchema);
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.datagen;
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.util.Pair;
import java.util.List;
import java.util.TreeMap;
/*
* EnumeratedDistrubtion's sample() method does a linear scan through the array of probabilities.
*
* This is too slow with high cardinality value sets, so this subclass overrides sample() to use
* a TreeMap instead.
*/
public class EnumeratedTreeDistribution<T> extends EnumeratedDistribution
{
private TreeMap<Double, Integer> probabilityRanges;
private List<Pair<T, Double>> normalizedPmf;
public EnumeratedTreeDistribution(final List<Pair<T, Double>> pmf)
{
super(pmf);
// build the interval tree
probabilityRanges = new TreeMap<Double, Integer>();
normalizedPmf = this.getPmf();
double cumulativep = 0.0;
for (int i = 0; i < normalizedPmf.size(); i++) {
probabilityRanges.put(cumulativep, i);
Pair<T, Double> pair = normalizedPmf.get(i);
cumulativep += pair.getSecond();
}
}
@Override
public T sample()
{
final double randomValue = random.nextDouble();
Integer valueIndex = probabilityRanges.floorEntry(randomValue).getValue();
return normalizedPmf.get(valueIndex).getFirst();
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.datagen;
import org.apache.commons.math3.distribution.AbstractIntegerDistribution;
import org.apache.commons.math3.distribution.AbstractRealDistribution;
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.util.Pair;
import java.util.List;
import java.util.TreeMap;
/*
* Rounds the output values from the sample() function of an AbstractRealDistribution.
*/
public class RealRoundingDistribution extends AbstractIntegerDistribution
{
private AbstractRealDistribution realDist;
public RealRoundingDistribution(AbstractRealDistribution realDist)
{
this.realDist = realDist;
}
@Override
public double probability(int x)
{
return 0;
}
@Override
public double cumulativeProbability(int x)
{
return 0;
}
@Override
public double getNumericalMean()
{
return 0;
}
@Override
public double getNumericalVariance()
{
return 0;
}
@Override
public int getSupportLowerBound()
{
return 0;
}
@Override
public int getSupportUpperBound()
{
return 0;
}
@Override
public boolean isSupportConnected()
{
return false;
}
@Override
public void reseedRandomGenerator(long seed)
{
realDist.reseedRandomGenerator(seed);
}
@Override
public int sample()
{
double randomVal = realDist.sample();
Long longVal = Math.round(randomVal);
return longVal.intValue();
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.datagen;
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.util.Pair;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TreeMap;
public class SequentialDistribution extends EnumeratedDistribution
{
private Integer start;
private Integer end;
private List<Object> enumeratedValues;
private int counter;
public SequentialDistribution(Integer start, Integer end, List<Object> enumeratedValues)
{
// just pass in some bogus probability mass function, we won't be using it
super(Arrays.asList(new Pair<Object, Double>(null, 1.0)));
this.start = start;
this.end = end;
this.enumeratedValues = enumeratedValues;
if (enumeratedValues == null) {
counter = start;
} else {
counter = 0;
}
}
@Override
public Object sample()
{
Object ret;
if (enumeratedValues != null) {
ret = enumeratedValues.get(counter);
counter = (counter + 1) % enumeratedValues.size();
} else {
ret = counter;
counter++;
if (counter >= end) {
counter = start;
}
}
return ret;
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.indexing;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class IncrementalIndexReadBenchmark
{
@Param({"750000"})
private int rowsPerSegment;
@Param({"basic"})
private String schema;
private static final Logger log = new Logger(IncrementalIndexReadBenchmark.class);
private static final int RNG_SEED = 9999;
private IncrementalIndex incIndex;
private BenchmarkSchemaInfo schemaInfo;
@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
rowsPerSegment
);
incIndex = makeIncIndex();
for (int j = 0; j < rowsPerSegment; j++) {
InputRow row = gen.nextRow();
if (j % 10000 == 0) {
log.info(j + " rows generated.");
}
incIndex.add(row);
}
}
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void read(Blackhole blackhole) throws Exception
{
IncrementalIndexStorageAdapter sa = new IncrementalIndexStorageAdapter(incIndex);
Sequence<Cursor> cursors = sa.makeCursors(null, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);
Cursor cursor = Sequences.toList(Sequences.limit(cursors, 1), Lists.<Cursor>newArrayList()).get(0);
List<DimensionSelector> selectors = new ArrayList<>();
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null)));
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimZipf", null)));
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimUniform", null)));
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequentialHalfNull", null)));
cursor.reset();
while (!cursor.isDone()) {
for (DimensionSelector selector : selectors) {
IndexedInts row = selector.getRow();
blackhole.consume(selector.lookupName(row.get(0)));
}
cursor.advance();
}
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.indexing;
import com.google.common.hash.Hashing;
import com.metamx.common.logger.Logger;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class IndexIngestionBenchmark
{
@Param({"75000"})
private int rowsPerSegment;
@Param({"basic"})
private String schema;
private static final Logger log = new Logger(IndexIngestionBenchmark.class);
private static final int RNG_SEED = 9999;
private IncrementalIndex incIndex;
private ArrayList<InputRow> rows;
private BenchmarkSchemaInfo schemaInfo;
@Setup
public void setup() throws IOException
{
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
rows = new ArrayList<InputRow>();
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
rowsPerSegment
);
for (int i = 0; i < rowsPerSegment; i++) {
InputRow row = gen.nextRow();
if (i % 10000 == 0) {
log.info(i + " rows generated.");
}
rows.add(row);
}
}
@Setup(Level.Iteration)
public void setup2() throws IOException
{
incIndex = makeIncIndex();
}
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void addRows(Blackhole blackhole) throws Exception
{
for (int i = 0; i < rowsPerSegment; i++) {
InputRow row = rows.get(i);
int rv = incIndex.add(row);
blackhole.consume(rv);
}
}
}

View File

@ -0,0 +1,201 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.metamx.common.logger.Logger;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class IndexMergeBenchmark
{
@Param({"5"})
private int numSegments;
@Param({"75000"})
private int rowsPerSegment;
@Param({"basic"})
private String schema;
private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
private List<QueryableIndex> indexesToMerge;
private BenchmarkSchemaInfo schemaInfo;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + + System.currentTimeMillis());
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
indexesToMerge = new ArrayList<>();
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
for (int i = 0; i < numSegments; i++) {
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
schemaInfo.getDataInterval(),
rowsPerSegment
);
IncrementalIndex incIndex = makeIncIndex();
for (int j = 0; j < rowsPerSegment; j++) {
InputRow row = gen.nextRow();
if (j % 10000 == 0) {
log.info(j + " rows generated.");
}
incIndex.add(row);
}
File tmpFile = Files.createTempDir();
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
tmpFile.deleteOnExit();
File indexFile = INDEX_MERGER_V9.persist(
incIndex,
tmpFile,
new IndexSpec()
);
QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
indexesToMerge.add(qIndex);
}
}
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void merge(Blackhole blackhole) throws Exception
{
File tmpFile = File.createTempFile("IndexMergeBenchmark-MERGEDFILE-" + System.currentTimeMillis(), ".TEMPFILE");
tmpFile.delete();
tmpFile.mkdirs();
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
tmpFile.deleteOnExit();
File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
blackhole.consume(mergedFile);
tmpFile.delete();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void mergeV9(Blackhole blackhole) throws Exception
{
File tmpFile = File.createTempFile("IndexMergeBenchmark-MERGEDFILE-V9-" + System.currentTimeMillis(), ".TEMPFILE");
tmpFile.delete();
tmpFile.mkdirs();
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
tmpFile.deleteOnExit();
File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
blackhole.consume(mergedFile);
tmpFile.delete();
}
}

View File

@ -0,0 +1,206 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.metamx.common.logger.Logger;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class IndexPersistBenchmark
{
@Param({"75000"})
private int rowsPerSegment;
@Param({"basic"})
private String schema;
private static final Logger log = new Logger(IndexPersistBenchmark.class);
private static final int RNG_SEED = 9999;
private IncrementalIndex incIndex;
private ArrayList<InputRow> rows;
private BenchmarkSchemaInfo schemaInfo;
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + + System.currentTimeMillis());
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
rows = new ArrayList<InputRow>();
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
rowsPerSegment
);
for (int i = 0; i < rowsPerSegment; i++) {
InputRow row = gen.nextRow();
if (i % 10000 == 0) {
log.info(i + " rows generated.");
}
rows.add(row);
}
}
@Setup(Level.Iteration)
public void setup2() throws IOException
{
incIndex = makeIncIndex();
for (int i = 0; i < rowsPerSegment; i++) {
InputRow row = rows.get(i);
incIndex.add(row);
}
}
@TearDown(Level.Iteration)
public void teardown() throws IOException
{
incIndex.close();
incIndex = null;
}
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void persist(Blackhole blackhole) throws Exception
{
File tmpFile = Files.createTempDir();
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
tmpFile.deleteOnExit();
File indexFile = INDEX_MERGER.persist(
incIndex,
tmpFile,
new IndexSpec()
);
blackhole.consume(indexFile);
tmpFile.delete();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void persistV9(Blackhole blackhole) throws Exception
{
File tmpFile = Files.createTempDir();
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
tmpFile.deleteOnExit();;
File indexFile = INDEX_MERGER_V9.persist(
incIndex,
tmpFile,
new IndexSpec()
);
blackhole.consume(indexFile);
tmpFile.delete();
}
}

View File

@ -0,0 +1,358 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.GroupByQueryRunnerFactory;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class GroupByBenchmark
{
@Param({"4"})
private int numSegments;
@Param({"100000"})
private int rowsPerSegment;
@Param({"basic.A"})
private String schemaAndQuery;
private static final Logger log = new Logger(GroupByBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
private List<IncrementalIndex> incIndexes;
private List<QueryableIndex> qIndexes;
private QueryRunnerFactory factory;
private BenchmarkSchemaInfo schemaInfo;
private GroupByQuery query;
private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
private static final Map<String, Map<String, GroupByQuery>> SCHEMA_QUERY_MAP = new LinkedHashMap<>();
private void setupQueries()
{
// queries for the basic schema
Map<String, GroupByQuery> basicQueries = new LinkedHashMap<>();
BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval()));
List<AggregatorFactory> queryAggs = new ArrayList<>();
queryAggs.add(new LongSumAggregatorFactory(
"sumLongSequential",
"sumLongSequential"
));
GroupByQuery queryA = GroupByQuery
.builder()
.setDataSource("blah")
.setQuerySegmentSpec(intervalSpec)
.setDimensions(Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("dimSequential", null),
new DefaultDimensionSpec("dimZipf", null)
//new DefaultDimensionSpec("dimUniform", null),
//new DefaultDimensionSpec("dimSequentialHalfNull", null)
//new DefaultDimensionSpec("dimMultivalEnumerated", null), //multival dims greatly increase the running time, disable for now
//new DefaultDimensionSpec("dimMultivalEnumerated2", null)
))
.setAggregatorSpecs(
queryAggs
)
.setGranularity(QueryGranularities.DAY)
.build();
basicQueries.put("A", queryA);
}
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
executorService = Execs.multiThreaded(numSegments, "GroupByThreadPool");
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
incIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + 1,
schemaInfo.getDataInterval(),
rowsPerSegment
);
IncrementalIndex incIndex = makeIncIndex();
incIndexes.add(incIndex);
for (int j = 0; j < rowsPerSegment; j++) {
InputRow row = gen.nextRow();
if (j % 10000 == 0) {
log.info(j + " rows generated.");
}
incIndex.add(row);
}
log.info(rowsPerSegment + " rows generated");
}
IncrementalIndex incIndex = incIndexes.get(0);
File tmpFile = Files.createTempDir();
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
tmpFile.deleteOnExit();
qIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
File indexFile = INDEX_MERGER_V9.persist(
incIndex,
tmpFile,
new IndexSpec()
);
QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
qIndexes.add(qIndex);
}
OffheapBufferPool bufferPool = new OffheapBufferPool(250000000, Integer.MAX_VALUE);
OffheapBufferPool bufferPool2 = new OffheapBufferPool(250000000, Integer.MAX_VALUE);
final GroupByQueryConfig config = new GroupByQueryConfig();
config.setSingleThreaded(false);
config.setMaxIntermediateRows(1000000);
config.setMaxResults(1000000);
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, bufferPool);
factory = new GroupByQueryRunnerFactory(
engine,
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(
configSupplier, JSON_MAPPER, engine, bufferPool2,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
),
bufferPool2
);
}
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
{
QueryToolChest toolChest = factory.getToolchest();
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
toolChest
);
Sequence<T> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
return Sequences.toList(queryResult, Lists.<T>newArrayList());
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleIncrementalIndex(Blackhole blackhole) throws Exception
{
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"incIndex",
new IncrementalIndexSegment(incIndexes.get(0), "incIndex")
);
List<Row> results = GroupByBenchmark.runQuery(factory, runner, query);
for (Row result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleQueryableIndex(Blackhole blackhole) throws Exception
{
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"qIndex",
new QueryableIndexSegment("qIndex", qIndexes.get(0))
);
List<Row> results = GroupByBenchmark.runQuery(factory, runner, query);
for (Row result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
{
List<QueryRunner<Row>> singleSegmentRunners = Lists.newArrayList();
QueryToolChest toolChest = factory.getToolchest();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, qIndexes.get(i))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
toolChest
)
);
Sequence<Row> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
List<Row> results = Sequences.toList(queryResult, Lists.<Row>newArrayList());
for (Row result : results) {
blackhole.consume(result);
}
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.query;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequence;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.segment.Segment;
import java.util.Map;
public class QueryBenchmarkUtil
{
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
String segmentId,
Segment adapter
)
{
return new FinalizeResultsQueryRunner<T>(
new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
),
(QueryToolChest<T, Query<T>>)factory.getToolchest()
);
}
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate,
QueryToolChest<T, ? extends Query<T>> toolChest) {
return new QueryRunner<T>() {
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return delegate.run(query, responseContext);
}
};
}
};
}
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
};
}

View File

@ -0,0 +1,342 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.SearchQueryRunnerFactory;
import io.druid.query.search.SearchResultValue;
import io.druid.query.search.search.SearchHit;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class SearchBenchmark
{
@Param({"1"})
private int numSegments;
@Param({"750000"})
private int rowsPerSegment;
@Param({"basic.A"})
private String schemaAndQuery;
@Param({"1000"})
private int limit;
private static final Logger log = new Logger(SearchBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
private List<IncrementalIndex> incIndexes;
private List<QueryableIndex> qIndexes;
private QueryRunnerFactory factory;
private BenchmarkSchemaInfo schemaInfo;
private Druids.SearchQueryBuilder queryBuilder;
private SearchQuery query;
private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
private static final Map<String, Map<String, Druids.SearchQueryBuilder>> SCHEMA_QUERY_MAP = new LinkedHashMap<>();
private void setupQueries()
{
// queries for the basic schema
Map<String, Druids.SearchQueryBuilder> basicQueries = new LinkedHashMap<>();
BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval()));
List<AggregatorFactory> queryAggs = new ArrayList<>();
queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"));
queryAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "maxLongUniform"));
queryAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"));
queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf"));
queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper"));
Druids.SearchQueryBuilder queryBuilderA =
Druids.newSearchQueryBuilder()
.dataSource("blah")
.granularity(QueryGranularities.ALL)
.intervals(intervalSpec)
.query("123");
basicQueries.put("A", queryBuilderA);
}
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
executorService = Execs.multiThreaded(numSegments, "SearchThreadPool");
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
queryBuilder.limit(limit);
query = queryBuilder.build();
incIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
schemaInfo.getDataInterval(),
rowsPerSegment
);
IncrementalIndex incIndex = makeIncIndex();
for (int j = 0; j < rowsPerSegment; j++) {
InputRow row = gen.nextRow();
if (j % 10000 == 0) {
log.info(j + " rows generated.");
}
incIndex.add(row);
}
incIndexes.add(incIndex);
}
File tmpFile = Files.createTempDir();
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
tmpFile.deleteOnExit();
qIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
File indexFile = INDEX_MERGER_V9.persist(
incIndexes.get(i),
tmpFile,
new IndexSpec()
);
QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
qIndexes.add(qIndex);
}
factory = new SearchQueryRunnerFactory(
new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
{
QueryToolChest toolChest = factory.getToolchest();
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
toolChest
);
Sequence<T> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
return Sequences.toList(queryResult, Lists.<T>newArrayList());
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleIncrementalIndex(Blackhole blackhole) throws Exception
{
QueryRunner<SearchHit> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"incIndex",
new IncrementalIndexSegment(incIndexes.get(0), "incIndex")
);
List<Result<SearchResultValue>> results = SearchBenchmark.runQuery(factory, runner, query);
List<SearchHit> hits = results.get(0).getValue().getValue();
for (SearchHit hit : hits) {
blackhole.consume(hit);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleQueryableIndex(Blackhole blackhole) throws Exception
{
final QueryRunner<Result<SearchResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"qIndex",
new QueryableIndexSegment("qIndex", qIndexes.get(0))
);
List<Result<SearchResultValue>> results = SearchBenchmark.runQuery(factory, runner, query);
List<SearchHit> hits = results.get(0).getValue().getValue();
for (SearchHit hit : hits) {
blackhole.consume(hit);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
{
List<QueryRunner<Row>> singleSegmentRunners = Lists.newArrayList();
QueryToolChest toolChest = factory.getToolchest();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
final QueryRunner<Result<SearchResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, qIndexes.get(i))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
toolChest
)
);
Sequence<Result<SearchResultValue>> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
List<Result<SearchResultValue>> results = Sequences.toList(queryResult, Lists.<Result<SearchResultValue>>newArrayList());
for (Result<SearchResultValue> result : results) {
List<SearchHit> hits = result.getValue().getValue();
for (SearchHit hit : hits) {
blackhole.consume(hit);
}
}
}
}

View File

@ -0,0 +1,386 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.select.EventHolder;
import io.druid.query.select.PagingSpec;
import io.druid.query.select.SelectQuery;
import io.druid.query.select.SelectQueryEngine;
import io.druid.query.select.SelectQueryQueryToolChest;
import io.druid.query.select.SelectQueryRunnerFactory;
import io.druid.query.select.SelectResultValue;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class SelectBenchmark
{
@Param({"1"})
private int numSegments;
@Param({"25000"})
private int rowsPerSegment;
@Param({"basic.A"})
private String schemaAndQuery;
@Param({"1000"})
private int pagingThreshold;
private static final Logger log = new Logger(SelectBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
private List<IncrementalIndex> incIndexes;
private List<QueryableIndex> qIndexes;
private QueryRunnerFactory factory;
private BenchmarkSchemaInfo schemaInfo;
private Druids.SelectQueryBuilder queryBuilder;
private SelectQuery query;
private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
private static final Map<String, Map<String, Druids.SelectQueryBuilder>> SCHEMA_QUERY_MAP = new LinkedHashMap<>();
private void setupQueries()
{
// queries for the basic schema
Map<String, Druids.SelectQueryBuilder> basicQueries = new LinkedHashMap<>();
BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval()));
Druids.SelectQueryBuilder queryBuilderA =
Druids.newSelectQueryBuilder()
.dataSource(new TableDataSource("blah"))
.dimensionSpecs(DefaultDimensionSpec.toSpec(Arrays.<String>asList()))
.metrics(Arrays.<String>asList())
.intervals(intervalSpec)
.granularity(QueryGranularities.ALL)
.descending(false);
basicQueries.put("A", queryBuilderA);
}
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
executorService = Execs.multiThreaded(numSegments, "SelectThreadPool");
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
queryBuilder.pagingSpec(PagingSpec.newSpec(pagingThreshold));
query = queryBuilder.build();
incIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
schemaInfo.getDataInterval(),
rowsPerSegment
);
IncrementalIndex incIndex = makeIncIndex();
for (int j = 0; j < rowsPerSegment; j++) {
InputRow row = gen.nextRow();
if (j % 10000 == 0) {
log.info(j + " rows generated.");
}
incIndex.add(row);
}
incIndexes.add(incIndex);
}
File tmpFile = Files.createTempDir();
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
tmpFile.deleteOnExit();
qIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
File indexFile = INDEX_MERGER_V9.persist(
incIndexes.get(i),
tmpFile,
new IndexSpec()
);
QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
qIndexes.add(qIndex);
}
factory = new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(
JSON_MAPPER,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
),
new SelectQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
{
QueryToolChest toolChest = factory.getToolchest();
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
toolChest
);
Sequence<T> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
return Sequences.toList(queryResult, Lists.<T>newArrayList());
}
// don't run this benchmark with a query that doesn't use QueryGranularity.ALL,
// this pagination function probably doesn't work correctly in that case.
private SelectQuery incrementQueryPagination(SelectQuery query, SelectResultValue prevResult)
{
Map<String, Integer> pagingIdentifiers = prevResult.getPagingIdentifiers();
Map<String, Integer> newPagingIdentifers = new HashMap<>();
for (String segmentId : pagingIdentifiers.keySet()) {
int newOffset = pagingIdentifiers.get(segmentId) + 1;
newPagingIdentifers.put(segmentId, newOffset);
}
return query.withPagingSpec(new PagingSpec(newPagingIdentifers, pagingThreshold));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryIncrementalIndex(Blackhole blackhole) throws Exception
{
SelectQuery queryCopy = query.withPagingSpec(PagingSpec.newSpec(pagingThreshold));
String segmentId = "incIndex";
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentId,
new IncrementalIndexSegment(incIndexes.get(0), segmentId)
);
boolean done = false;
while (!done) {
List<Result<SelectResultValue>> results = SelectBenchmark.runQuery(factory, runner, queryCopy);
SelectResultValue result = results.get(0).getValue();
if (result.getEvents().size() == 0) {
done = true;
} else {
for (EventHolder eh : result.getEvents()) {
blackhole.consume(eh);
}
queryCopy = incrementQueryPagination(queryCopy, result);
}
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryQueryableIndex(Blackhole blackhole) throws Exception
{
SelectQuery queryCopy = query.withPagingSpec(PagingSpec.newSpec(pagingThreshold));
String segmentId = "qIndex";
QueryRunner<Result<SelectResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentId,
new QueryableIndexSegment(segmentId, qIndexes.get(0))
);
boolean done = false;
while (!done) {
List<Result<SelectResultValue>> results = SelectBenchmark.runQuery(factory, runner, queryCopy);
SelectResultValue result = results.get(0).getValue();
if (result.getEvents().size() == 0) {
done = true;
} else {
for (EventHolder eh : result.getEvents()) {
blackhole.consume(eh);
}
queryCopy = incrementQueryPagination(queryCopy, result);
}
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
{
SelectQuery queryCopy = query.withPagingSpec(PagingSpec.newSpec(pagingThreshold));
String segmentName;
List<QueryRunner<Result<SelectResultValue>>> singleSegmentRunners = Lists.newArrayList();
QueryToolChest toolChest = factory.getToolchest();
for (int i = 0; i < numSegments; i++) {
segmentName = "qIndex" + i;
QueryRunner<Result<SelectResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, qIndexes.get(i))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
toolChest
)
);
boolean done = false;
while (!done) {
Sequence<Result<SelectResultValue>> queryResult = theRunner.run(queryCopy, Maps.<String, Object>newHashMap());
List<Result<SelectResultValue>> results = Sequences.toList(queryResult, Lists.<Result<SelectResultValue>>newArrayList());
SelectResultValue result = results.get(0).getValue();
if (result.getEvents().size() == 0) {
done = true;
} else {
for (EventHolder eh : result.getEvents()) {
blackhole.consume(eh);
}
queryCopy = incrementQueryPagination(queryCopy, result);
}
}
}
}

View File

@ -0,0 +1,354 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class TimeseriesBenchmark
{
@Param({"1"})
private int numSegments;
@Param({"750000"})
private int rowsPerSegment;
@Param({"basic.A"})
private String schemaAndQuery;
private static final Logger log = new Logger(TimeseriesBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
private List<IncrementalIndex> incIndexes;
private List<QueryableIndex> qIndexes;
private QueryRunnerFactory factory;
private BenchmarkSchemaInfo schemaInfo;
private TimeseriesQuery query;
private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
private static final Map<String, Map<String, TimeseriesQuery>> SCHEMA_QUERY_MAP = new LinkedHashMap<>();
private void setupQueries()
{
// queries for the basic schema
Map<String, TimeseriesQuery> basicQueries = new LinkedHashMap<>();
BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval()));
List<AggregatorFactory> queryAggs = new ArrayList<>();
queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"));
queryAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "maxLongUniform"));
queryAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"));
queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf"));
queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper"));
TimeseriesQuery queryA =
Druids.newTimeseriesQueryBuilder()
.dataSource("blah")
.granularity(QueryGranularities.ALL)
.intervals(intervalSpec)
.aggregators(queryAggs)
.descending(false)
.build();
basicQueries.put("A", queryA);
}
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
executorService = Execs.multiThreaded(numSegments, "TimeseriesThreadPool");
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
incIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
schemaInfo.getDataInterval(),
rowsPerSegment
);
IncrementalIndex incIndex = makeIncIndex();
for (int j = 0; j < rowsPerSegment; j++) {
InputRow row = gen.nextRow();
if (j % 10000 == 0) {
log.info(j + " rows generated.");
}
incIndex.add(row);
}
log.info(rowsPerSegment + " rows generated");
incIndexes.add(incIndex);
}
File tmpFile = Files.createTempDir();
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
tmpFile.deleteOnExit();
qIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
File indexFile = INDEX_MERGER_V9.persist(
incIndexes.get(i),
tmpFile,
new IndexSpec()
);
QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
qIndexes.add(qIndex);
}
factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
{
QueryToolChest toolChest = factory.getToolchest();
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
toolChest
);
Sequence<T> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
return Sequences.toList(queryResult, Lists.<T>newArrayList());
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleIncrementalIndex(Blackhole blackhole) throws Exception
{
QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"incIndex",
new IncrementalIndexSegment(incIndexes.get(0), "incIndex")
);
List<Result<TimeseriesResultValue>> results = TimeseriesBenchmark.runQuery(factory, runner, query);
for (Result<TimeseriesResultValue> result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleQueryableIndex(Blackhole blackhole) throws Exception
{
final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"qIndex",
new QueryableIndexSegment("qIndex", qIndexes.get(0))
);
List<Result<TimeseriesResultValue>> results = TimeseriesBenchmark.runQuery(factory, runner, query);
for (Result<TimeseriesResultValue> result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryFilteredSingleQueryableIndex(Blackhole blackhole) throws Exception
{
final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"qIndex",
new QueryableIndexSegment("qIndex", qIndexes.get(0))
);
DimFilter filter = new SelectorDimFilter("dimSequential", "399", null);
Query filteredQuery = query.withDimFilter(filter);
List<Result<TimeseriesResultValue>> results = TimeseriesBenchmark.runQuery(factory, runner, filteredQuery);
for (Result<TimeseriesResultValue> result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
{
List<QueryRunner<Result<TimeseriesResultValue>>> singleSegmentRunners = Lists.newArrayList();
QueryToolChest toolChest = factory.getToolchest();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, qIndexes.get(i))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
toolChest
)
);
Sequence<Result<TimeseriesResultValue>> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
List<Result<TimeseriesResultValue>> results = Sequences.toList(queryResult, Lists.<Result<TimeseriesResultValue>>newArrayList());
for (Result<TimeseriesResultValue> result : results) {
blackhole.consume(result);
}
}
}

View File

@ -0,0 +1,338 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNQueryRunnerFactory;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class TopNBenchmark
{
@Param({"1"})
private int numSegments;
@Param({"750000"})
private int rowsPerSegment;
@Param({"basic.A"})
private String schemaAndQuery;
@Param({"10"})
private int threshold;
private static final Logger log = new Logger(TopNBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
private List<IncrementalIndex> incIndexes;
private List<QueryableIndex> qIndexes;
private QueryRunnerFactory factory;
private BenchmarkSchemaInfo schemaInfo;
private TopNQueryBuilder queryBuilder;
private TopNQuery query;
private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
private static final Map<String, Map<String, TopNQueryBuilder>> SCHEMA_QUERY_MAP = new LinkedHashMap<>();
private void setupQueries()
{
// queries for the basic schema
Map<String, TopNQueryBuilder> basicQueries = new LinkedHashMap<>();
BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval()));
List<AggregatorFactory> queryAggs = new ArrayList<>();
queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"));
queryAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "maxLongUniform"));
queryAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"));
queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf"));
queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper"));
TopNQueryBuilder queryBuilderA = new TopNQueryBuilder()
.dataSource("blah")
.granularity(QueryGranularities.ALL)
.dimension("dimSequential")
.metric("sumFloatNormal")
.intervals(intervalSpec)
.aggregators(queryAggs);
basicQueries.put("A", queryBuilderA);
}
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
queryBuilder.threshold(threshold);
query = queryBuilder.build();
incIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
schemaInfo.getDataInterval(),
rowsPerSegment
);
IncrementalIndex incIndex = makeIncIndex();
for (int j = 0; j < rowsPerSegment; j++) {
InputRow row = gen.nextRow();
if (j % 10000 == 0) {
log.info(j + " rows generated.");
}
incIndex.add(row);
}
incIndexes.add(incIndex);
}
File tmpFile = Files.createTempDir();
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
tmpFile.deleteOnExit();
qIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
File indexFile = INDEX_MERGER_V9.persist(
incIndexes.get(i),
tmpFile,
new IndexSpec()
);
QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
qIndexes.add(qIndex);
}
factory = new TopNQueryRunnerFactory(
new OffheapBufferPool(250000000, Integer.MAX_VALUE),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
{
QueryToolChest toolChest = factory.getToolchest();
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
toolChest
);
Sequence<T> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
return Sequences.toList(queryResult, Lists.<T>newArrayList());
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleIncrementalIndex(Blackhole blackhole) throws Exception
{
QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"incIndex",
new IncrementalIndexSegment(incIndexes.get(0), "incIndex")
);
List<Result<TopNResultValue>> results = TopNBenchmark.runQuery(factory, runner, query);
for (Result<TopNResultValue> result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleQueryableIndex(Blackhole blackhole) throws Exception
{
final QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"qIndex",
new QueryableIndexSegment("qIndex", qIndexes.get(0))
);
List<Result<TopNResultValue>> results = TopNBenchmark.runQuery(factory, runner, query);
for (Result<TopNResultValue> result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
{
List<QueryRunner<Result<TopNResultValue>>> singleSegmentRunners = Lists.newArrayList();
QueryToolChest toolChest = factory.getToolchest();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, qIndexes.get(i))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
toolChest
)
);
Sequence<Result<TopNResultValue>> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
List<Result<TopNResultValue>> results = Sequences.toList(queryResult, Lists.<Result<TopNResultValue>>newArrayList());
for (Result<TopNResultValue> result : results) {
blackhole.consume(result);
}
}
}

View File

@ -0,0 +1,444 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark;
import io.druid.benchmark.datagen.BenchmarkColumnSchema;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.data.input.InputRow;
import io.druid.segment.column.ValueType;
import org.joda.time.Interval;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
// Doesn't assert behavior right now, just generates rows and prints out some distribution numbers
public class BenchmarkDataGeneratorTest
{
@Test
public void testSequential() throws Exception {
List<BenchmarkColumnSchema> schemas = new ArrayList<>();
RowValueTracker tracker = new RowValueTracker();
schemas.add(
BenchmarkColumnSchema.makeSequential(
"dimA",
ValueType.STRING,
false,
1,
null,
10,
20
)
);
schemas.add(
BenchmarkColumnSchema.makeEnumeratedSequential(
"dimB",
ValueType.STRING,
false,
1,
null,
Arrays.<Object>asList("Hello", "World", "Foo", "Bar")
)
);
schemas.add(
BenchmarkColumnSchema.makeSequential(
"dimC",
ValueType.STRING,
false,
1,
0.50,
30,
40
)
);
BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0);
for (int i = 0; i < 100; i++) {
InputRow row = dataGenerator.nextRow();
//System.out.println("S-ROW: " + row);
tracker.addRow(row);
}
tracker.printStuff();
}
@Test
public void testDiscreteUniform() throws Exception {
List<BenchmarkColumnSchema> schemas = new ArrayList<>();
RowValueTracker tracker = new RowValueTracker();
schemas.add(
BenchmarkColumnSchema.makeDiscreteUniform(
"dimA",
ValueType.STRING,
false,
1,
null,
10,
20
)
);
schemas.add(
BenchmarkColumnSchema.makeEnumeratedDiscreteUniform(
"dimB",
ValueType.STRING,
false,
4,
null,
Arrays.<Object>asList("Hello", "World", "Foo", "Bar")
)
);
schemas.add(
BenchmarkColumnSchema.makeDiscreteUniform(
"dimC",
ValueType.STRING,
false,
1,
0.50,
10,
20
)
);
schemas.add(
BenchmarkColumnSchema.makeDiscreteUniform(
"dimD",
ValueType.FLOAT,
false,
1,
null,
100,
120
)
);
BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0);
for (int i = 0; i < 100; i++) {
InputRow row = dataGenerator.nextRow();
//System.out.println("U-ROW: " + row);
tracker.addRow(row);
}
tracker.printStuff();
}
@Test
public void testRoundedNormal() throws Exception {
List<BenchmarkColumnSchema> schemas = new ArrayList<>();
RowValueTracker tracker = new RowValueTracker();
schemas.add(
BenchmarkColumnSchema.makeNormal(
"dimA",
ValueType.FLOAT,
false,
1,
null,
50.0,
1.0,
true
)
);
schemas.add(
BenchmarkColumnSchema.makeNormal(
"dimB",
ValueType.STRING,
false,
1,
null,
1000.0,
10.0,
true
)
);
BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0);
for (int i = 0; i < 1000000; i++) {
InputRow row = dataGenerator.nextRow();
//System.out.println("N-ROW: " + row);
tracker.addRow(row);
}
tracker.printStuff();
}
@Test
public void testZipf() throws Exception {
List<BenchmarkColumnSchema> schemas = new ArrayList<>();
RowValueTracker tracker = new RowValueTracker();
schemas.add(
BenchmarkColumnSchema.makeZipf(
"dimA",
ValueType.STRING,
false,
1,
null,
1000,
2000,
1.0
)
);
schemas.add(
BenchmarkColumnSchema.makeZipf(
"dimB",
ValueType.FLOAT,
false,
1,
null,
99990,
99999,
1.0
)
);
schemas.add(
BenchmarkColumnSchema.makeEnumeratedZipf(
"dimC",
ValueType.STRING,
false,
1,
null,
Arrays.<Object>asList("1-Hello", "2-World", "3-Foo", "4-Bar", "5-BA5EBA11", "6-Rocky", "7-Mango", "8-Contango"),
1.0
)
);
BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0);
for (int i = 0; i < 100; i++) {
InputRow row = dataGenerator.nextRow();
//System.out.println("Z-ROW: " + row);
tracker.addRow(row);
}
tracker.printStuff();
}
@Test
public void testEnumerated() throws Exception {
List<BenchmarkColumnSchema> schemas = new ArrayList<>();
RowValueTracker tracker = new RowValueTracker();
schemas.add(
BenchmarkColumnSchema.makeEnumerated(
"dimA",
ValueType.STRING,
false,
1,
null,
Arrays.<Object>asList("Hello", "World", "Foo", "Bar"),
Arrays.<Double>asList(0.5, 0.25, 0.15, 0.10)
)
);
BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0);
for (int i = 0; i < 10000; i++) {
InputRow row = dataGenerator.nextRow();
//System.out.println("Z-ROW: " + row);
tracker.addRow(row);
}
tracker.printStuff();
}
@Test
public void testNormal() throws Exception {
List<BenchmarkColumnSchema> schemas = new ArrayList<>();
RowValueTracker tracker = new RowValueTracker();
schemas.add(
BenchmarkColumnSchema.makeNormal(
"dimA",
ValueType.FLOAT,
false,
1,
null,
8.0,
1.0,
false
)
);
schemas.add(
BenchmarkColumnSchema.makeNormal(
"dimB",
ValueType.STRING,
false,
1,
0.50,
88.0,
2.0,
false
)
);
BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0);
for (int i = 0; i < 100; i++) {
InputRow row = dataGenerator.nextRow();
//System.out.println("N-ROW: " + row);
tracker.addRow(row);
}
tracker.printStuff();
}
@Test
public void testRealUniform() throws Exception {
List<BenchmarkColumnSchema> schemas = new ArrayList<>();
RowValueTracker tracker = new RowValueTracker();
schemas.add(
BenchmarkColumnSchema.makeContinuousUniform(
"dimA",
ValueType.STRING,
false,
1,
null,
10.0,
50.0
)
);
schemas.add(
BenchmarkColumnSchema.makeContinuousUniform(
"dimB",
ValueType.FLOAT,
false,
1,
null,
210.0,
250.0
)
);
BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0);
for (int i = 0; i < 100; i++) {
InputRow row = dataGenerator.nextRow();
//System.out.println("U-ROW: " + row);
tracker.addRow(row);
}
tracker.printStuff();
}
@Test
public void testIntervalBasedTimeGeneration() throws Exception {
List<BenchmarkColumnSchema> schemas = new ArrayList<>();
schemas.add(
BenchmarkColumnSchema.makeEnumeratedSequential(
"dimB",
ValueType.STRING,
false,
1,
null,
Arrays.<Object>asList("Hello", "World", "Foo", "Bar")
)
);
BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, new Interval(50000, 600000), 100);
for (int i = 0; i < 100; i++) {
InputRow row = dataGenerator.nextRow();
//System.out.println("S-ROW: " + row);
}
BenchmarkDataGenerator dataGenerator2 = new BenchmarkDataGenerator(schemas, 9999, new Interval(50000, 50001), 100);
for (int i = 0; i < 100; i++) {
InputRow row = dataGenerator2.nextRow();
//System.out.println("S2-ROW: " + row);
}
}
private class RowValueTracker
{
private Map<String, Map<Object, Integer>> dimensionMap;
public RowValueTracker() {
dimensionMap = new HashMap<>();
}
public void addRow(InputRow row) {
for (String dim : row.getDimensions()) {
if (dimensionMap.get(dim) == null) {
dimensionMap.put(dim, new HashMap<Object, Integer>());
}
Map<Object, Integer> valueMap = dimensionMap.get(dim);
Object dimVals = row.getRaw(dim);
if (dimVals == null) {
dimVals = Collections.singletonList(null);
} else if (!(dimVals instanceof List)) {
dimVals = Collections.singletonList(dimVals);
}
List<Object> dimValsList = (List) dimVals;
for (Object val : dimValsList) {
if (val == null) {
val = "";
}
if (valueMap.get(val) == null) {
valueMap.put(val, 0);
}
valueMap.put(val, valueMap.get(val) + 1);
}
}
}
public void printStuff()
{
System.out.println();
for (String dim : dimensionMap.keySet()) {
System.out.println("DIMENSION " + dim + "\n============");
Map<Object, Integer> valueMap = dimensionMap.get(dim);
List<Comparable> valList = new ArrayList<>();
for (Object val : valueMap.keySet()) {
valList.add((Comparable) val);
}
Collections.sort(valList);
for (Comparable val : valList) {
System.out.println(" VAL: " + val.toString() + " CNT: " + valueMap.get(val));
}
System.out.println();
}
}
}
}

View File

@ -39,6 +39,11 @@ public class GroupByQueryConfig
return singleThreaded; return singleThreaded;
} }
public void setSingleThreaded(boolean singleThreaded)
{
this.singleThreaded = singleThreaded;
}
public int getMaxIntermediateRows() public int getMaxIntermediateRows()
{ {
return maxIntermediateRows; return maxIntermediateRows;
@ -53,4 +58,9 @@ public class GroupByQueryConfig
{ {
return maxResults; return maxResults;
} }
public void setMaxResults(int maxResults)
{
this.maxResults = maxResults;
}
} }