diff --git a/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexRowTypeBenchmark.java similarity index 99% rename from benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java rename to benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexRowTypeBenchmark.java index 492fb327464..7e753ad35b0 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexRowTypeBenchmark.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularities; + import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -47,7 +48,7 @@ import java.util.Random; import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -public class IncrementalIndexAddRowsBenchmark +public class IncrementalIndexRowTypeBenchmark { private IncrementalIndex incIndex; private IncrementalIndex incFloatIndex; diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkColumnSchema.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkColumnSchema.java new file mode 100644 index 00000000000..b477f84add5 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkColumnSchema.java @@ -0,0 +1,429 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.datagen; + +import io.druid.segment.column.ValueType; + +import java.util.List; + +public class BenchmarkColumnSchema +{ + /** + * SEQUENTIAL: Generate integer or enumerated values in sequence. Not random. + * + * DISCRETE_UNIFORM: Discrete uniform distribution, generates integers or enumerated values. + * + * ROUNDED_NORMAL: Discrete distribution that rounds sample values from an underlying normal + * distribution + * + * ZIPF: Discrete Zipf distribution. + * Lower numbers have higher probability. + * Can also generate Zipf distribution from a list of enumerated values. + * + * ENUMERATED: Discrete distribution, generated from lists of values and associated probabilities. + * + * NORMAL: Continuous normal distribution. + * + * UNIFORM: Continuous uniform distribution. + */ + public enum ValueDistribution + { + // discrete distributions + SEQUENTIAL, + DISCRETE_UNIFORM, + ROUNDED_NORMAL, + ZIPF, + ENUMERATED, + + // continuous distributions + UNIFORM, + NORMAL + } + + /** + * Generate values according to this distribution. + */ + private ValueDistribution distributionType; + + /** + * Name of the column. + */ + private String name; + + /** + * Value type of this column. + */ + private ValueType type; + + /** + * Is this column a metric or dimension? + */ + private boolean isMetric; + + /** + * Controls how many values are generated per row (use > 1 for multi-value dimensions) + */ + private int rowSize; + + /** + * Probability that a null row will be generated instead of a row with values sampled from the distribution. + */ + private final Double nullProbability; + + /** + * When used in discrete distributions, the set of possible values to be generated. + */ + private List enumeratedValues; + + /** + * When using ENUMERATED distribution, the probabilities associated with the set of values to be generated. + * The probabilities in this list must follow the same order as those in enumeratedValues. + * Probabilities do not need to sum to 1.0, they will be automatically normalized. + */ + private List enumeratedProbabilities; + + /** + * Range of integer values to generate in ZIPF and DISCRETE_NORMAL distributions. + */ + private Integer startInt; + private Integer endInt; + + /** + * Range of double values to generate in NORMAL distribution. + */ + private Double startDouble; + private Double endDouble; + + /** + * Exponent for the ZIPF distribution. + */ + private Double zipfExponent; + + /** + * Mean and standard deviation for the NORMAL and ROUNDED_NORMAL distributions. + */ + private Double mean; + private Double standardDeviation; + + private BenchmarkColumnSchema( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + ValueDistribution distributionType + ) + { + this.name = name; + this.type = type; + this.isMetric = isMetric; + this.distributionType = distributionType; + this.rowSize = rowSize; + this.nullProbability = nullProbability; + } + + public BenchmarkColumnValueGenerator makeGenerator(long seed) + { + return new BenchmarkColumnValueGenerator(this, seed); + } + + public String getName() + { + return name; + } + + public Double getNullProbability() + { + return nullProbability; + } + + public ValueType getType() + { + return type; + } + + public boolean isMetric() + { + return isMetric; + } + + public ValueDistribution getDistributionType() + { + return distributionType; + } + + public int getRowSize() + { + return rowSize; + } + + public List getEnumeratedValues() + { + return enumeratedValues; + } + + public List getEnumeratedProbabilities() + { + return enumeratedProbabilities; + } + + public Integer getStartInt() + { + return startInt; + } + + public Integer getEndInt() + { + return endInt; + } + + public Double getStartDouble() + { + return startDouble; + } + + public Double getEndDouble() + { + return endDouble; + } + + public Double getZipfExponent() + { + return zipfExponent; + } + + public Double getMean() + { + return mean; + } + + public Double getStandardDeviation() + { + return standardDeviation; + } + + public static BenchmarkColumnSchema makeSequential( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + int startInt, + int endInt + ) + { + BenchmarkColumnSchema schema = new BenchmarkColumnSchema( + name, + type, + isMetric, + rowSize, + nullProbability, + ValueDistribution.SEQUENTIAL + ); + schema.startInt = startInt; + schema.endInt = endInt; + return schema; + }; + + public static BenchmarkColumnSchema makeEnumeratedSequential( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + List enumeratedValues + ) + { + BenchmarkColumnSchema schema = new BenchmarkColumnSchema( + name, + type, + isMetric, + rowSize, + nullProbability, + ValueDistribution.SEQUENTIAL + ); + schema.enumeratedValues = enumeratedValues; + return schema; + }; + + public static BenchmarkColumnSchema makeDiscreteUniform( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + int startInt, + int endInt + ) + { + BenchmarkColumnSchema schema = new BenchmarkColumnSchema( + name, + type, + isMetric, + rowSize, + nullProbability, + ValueDistribution.DISCRETE_UNIFORM + ); + schema.startInt = startInt; + schema.endInt = endInt; + return schema; + }; + + public static BenchmarkColumnSchema makeEnumeratedDiscreteUniform( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + List enumeratedValues + ) + { + BenchmarkColumnSchema schema = new BenchmarkColumnSchema( + name, + type, + isMetric, + rowSize, + nullProbability, + ValueDistribution.DISCRETE_UNIFORM + ); + schema.enumeratedValues = enumeratedValues; + return schema; + }; + + public static BenchmarkColumnSchema makeContinuousUniform( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + double startDouble, + double endDouble + ) + { + BenchmarkColumnSchema schema = new BenchmarkColumnSchema( + name, + type, + isMetric, + rowSize, + nullProbability, + ValueDistribution.UNIFORM + ); + schema.startDouble = startDouble; + schema.endDouble = endDouble; + return schema; + }; + + + public static BenchmarkColumnSchema makeNormal( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + Double mean, + Double standardDeviation, + boolean useRounding + ) + { + BenchmarkColumnSchema schema = new BenchmarkColumnSchema( + name, + type, + isMetric, + rowSize, + nullProbability, + useRounding ? ValueDistribution.ROUNDED_NORMAL : ValueDistribution.NORMAL + ); + schema.mean = mean; + schema.standardDeviation = standardDeviation; + return schema; + }; + + public static BenchmarkColumnSchema makeZipf( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + int startInt, + int endInt, + Double zipfExponent + ) + { + BenchmarkColumnSchema schema = new BenchmarkColumnSchema( + name, + type, + isMetric, + rowSize, + nullProbability, + ValueDistribution.ZIPF + ); + schema.startInt = startInt; + schema.endInt = endInt; + schema.zipfExponent = zipfExponent; + return schema; + }; + + public static BenchmarkColumnSchema makeEnumeratedZipf( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + List enumeratedValues, + Double zipfExponent + ) + { + BenchmarkColumnSchema schema = new BenchmarkColumnSchema( + name, + type, + isMetric, + rowSize, + nullProbability, + ValueDistribution.ZIPF + ); + schema.enumeratedValues = enumeratedValues; + schema.zipfExponent = zipfExponent; + return schema; + }; + + + public static BenchmarkColumnSchema makeEnumerated( + String name, + ValueType type, + boolean isMetric, + int rowSize, + Double nullProbability, + List enumeratedValues, + List enumeratedProbabilities + ) + { + BenchmarkColumnSchema schema = new BenchmarkColumnSchema( + name, + type, + isMetric, + rowSize, + nullProbability, + ValueDistribution.ENUMERATED + ); + schema.enumeratedValues = enumeratedValues; + schema.enumeratedProbabilities = enumeratedProbabilities; + return schema; + }; +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkColumnValueGenerator.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkColumnValueGenerator.java new file mode 100644 index 00000000000..966021b483b --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkColumnValueGenerator.java @@ -0,0 +1,215 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.datagen; + +import io.druid.segment.column.ValueType; +import org.apache.commons.math3.distribution.AbstractIntegerDistribution; +import org.apache.commons.math3.distribution.AbstractRealDistribution; +import org.apache.commons.math3.distribution.EnumeratedDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.distribution.UniformRealDistribution; +import org.apache.commons.math3.distribution.ZipfDistribution; +import org.apache.commons.math3.util.Pair; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class BenchmarkColumnValueGenerator +{ + private final BenchmarkColumnSchema schema; + private final long seed; + + private Serializable distribution; + private Random simpleRng; + + public BenchmarkColumnValueGenerator( + BenchmarkColumnSchema schema, + long seed + ) + { + this.schema = schema; + this.seed = seed; + + simpleRng = new Random(seed); + initDistribution(); + } + + public Object generateRowValue() + { + Double nullProbability = schema.getNullProbability(); + int rowSize = schema.getRowSize(); + + if (nullProbability != null) { + Double randDouble = simpleRng.nextDouble(); + if (randDouble <= nullProbability) { + return null; + } + } + + if (rowSize == 1) { + return generateSingleRowValue(); + } else { + List rowVals = new ArrayList<>(rowSize); + for (int i = 0; i < rowSize; i++) { + rowVals.add(generateSingleRowValue()); + } + return rowVals; + } + } + + public BenchmarkColumnSchema getSchema() + { + return schema; + } + + public long getSeed() + { + return seed; + } + + private Object generateSingleRowValue() + { + Object ret = null; + ValueType type = schema.getType(); + + if (distribution instanceof AbstractIntegerDistribution) { + ret = ((AbstractIntegerDistribution) distribution).sample(); + } else if (distribution instanceof AbstractRealDistribution) { + ret = ((AbstractRealDistribution) distribution).sample(); + } else if (distribution instanceof EnumeratedDistribution) { + ret = ((EnumeratedDistribution) distribution).sample(); + } + + ret = convertType(ret, type); + return ret; + } + + private Object convertType(Object input, ValueType type) + { + if (input == null) { + return null; + } + + Object ret; + switch (type) { + case STRING: + ret = input.toString(); + break; + case LONG: + if (input instanceof Number) { + ret = ((Number) input).longValue(); + } else { + ret = Long.parseLong(input.toString()); + } + break; + case FLOAT: + if (input instanceof Number) { + ret = ((Number) input).floatValue(); + } else { + ret = Float.parseFloat(input.toString()); + } + break; + default: + throw new UnsupportedOperationException("Unknown data type: " + type); + } + return ret; + } + + private void initDistribution() + { + BenchmarkColumnSchema.ValueDistribution distributionType = schema.getDistributionType(); + ValueType type = schema.getType(); + List enumeratedValues = schema.getEnumeratedValues(); + List enumeratedProbabilities = schema.getEnumeratedProbabilities(); + List> probabilities = new ArrayList<>(); + + switch (distributionType) { + case SEQUENTIAL: + // not random, just cycle through numbers from start to end, or cycle through enumerated values if provided + distribution = new SequentialDistribution( + schema.getStartInt(), + schema.getEndInt(), + schema.getEnumeratedValues() + ); + break; + case UNIFORM: + distribution = new UniformRealDistribution(schema.getStartDouble(), schema.getEndDouble()); + break; + case DISCRETE_UNIFORM: + if (enumeratedValues == null) { + enumeratedValues = new ArrayList<>(); + for (int i = schema.getStartInt(); i < schema.getEndInt(); i++) { + Object val = convertType(i, type); + enumeratedValues.add(val); + } + } + // give them all equal probability, the library will normalize probabilities to sum to 1.0 + for (int i = 0; i < enumeratedValues.size(); i++) { + probabilities.add(new Pair<>(enumeratedValues.get(i), 0.1)); + } + distribution = new EnumeratedTreeDistribution<>(probabilities); + break; + case NORMAL: + distribution = new NormalDistribution(schema.getMean(), schema.getStandardDeviation()); + break; + case ROUNDED_NORMAL: + NormalDistribution normalDist = new NormalDistribution(schema.getMean(), schema.getStandardDeviation()); + distribution = new RealRoundingDistribution(normalDist); + break; + case ZIPF: + int cardinality; + if (enumeratedValues == null) { + Integer startInt = schema.getStartInt(); + cardinality = schema.getEndInt() - startInt; + ZipfDistribution zipf = new ZipfDistribution(cardinality, schema.getZipfExponent()); + for (int i = 0; i < cardinality; i++) { + probabilities.add(new Pair<>((Object) (i + startInt), zipf.probability(i))); + } + } else { + cardinality = enumeratedValues.size(); + ZipfDistribution zipf = new ZipfDistribution(enumeratedValues.size(), schema.getZipfExponent()); + for (int i = 0; i < cardinality; i++) { + probabilities.add(new Pair<>(enumeratedValues.get(i), zipf.probability(i))); + } + } + distribution = new EnumeratedTreeDistribution<>(probabilities); + break; + case ENUMERATED: + for (int i = 0; i < enumeratedValues.size(); i++) { + probabilities.add(new Pair<>(enumeratedValues.get(i), enumeratedProbabilities.get(i))); + } + distribution = new EnumeratedTreeDistribution<>(probabilities); + break; + + default: + throw new UnsupportedOperationException("Unknown distribution type: " + distributionType); + } + + if (distribution instanceof AbstractIntegerDistribution) { + ((AbstractIntegerDistribution) distribution).reseedRandomGenerator(seed); + } else if (distribution instanceof AbstractRealDistribution) { + ((AbstractRealDistribution) distribution).reseedRandomGenerator(seed); + } else if (distribution instanceof EnumeratedDistribution) { + ((EnumeratedDistribution) distribution).reseedRandomGenerator(seed); + } + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkDataGenerator.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkDataGenerator.java new file mode 100644 index 00000000000..40cd443c980 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkDataGenerator.java @@ -0,0 +1,146 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.datagen; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class BenchmarkDataGenerator +{ + private final List columnSchemas; + private final long seed; + + private List columnGenerators; + private final long startTime; + private final long endTime; + private final int numConsecutiveTimestamps; + private final double timestampIncrement; + + private double currentTime; + private int timeCounter; + private List dimensionNames; + + public BenchmarkDataGenerator( + List columnSchemas, + final long seed, + long startTime, + int numConsecutiveTimestamps, + Double timestampIncrement + ) + { + this.columnSchemas = columnSchemas; + this.seed = seed; + + this.startTime = startTime; + this.endTime = Long.MAX_VALUE; + this.numConsecutiveTimestamps = numConsecutiveTimestamps; + this.timestampIncrement = timestampIncrement; + this.currentTime = startTime; + + init(); + } + + public BenchmarkDataGenerator( + List columnSchemas, + final long seed, + Interval interval, + int numRows + ) + { + this.columnSchemas = columnSchemas; + this.seed = seed; + + this.startTime = interval.getStartMillis(); + this.endTime = interval.getEndMillis(); + + long timeDelta = endTime - startTime; + this.timestampIncrement = timeDelta / (numRows * 1.0); + this.numConsecutiveTimestamps = 0; + + init(); + } + + public InputRow nextRow() + { + Map event = new HashMap<>(); + for (BenchmarkColumnValueGenerator generator : columnGenerators) { + event.put(generator.getSchema().getName(), generator.generateRowValue()); + } + MapBasedInputRow row = new MapBasedInputRow(nextTimestamp(), dimensionNames, event); + return row; + } + + private void init() + { + this.timeCounter = 0; + this.currentTime = startTime; + + dimensionNames = new ArrayList<>(); + for (BenchmarkColumnSchema schema: columnSchemas) { + if (!schema.isMetric()) { + dimensionNames.add(schema.getName()); + } + } + + columnGenerators = new ArrayList<>(); + columnGenerators.addAll( + Lists.transform( + columnSchemas, + new Function() + { + @Override + public BenchmarkColumnValueGenerator apply( + BenchmarkColumnSchema input + ) + { + return input.makeGenerator(seed); + } + } + ) + ); + } + + private long nextTimestamp() + { + timeCounter += 1; + if (timeCounter > numConsecutiveTimestamps) { + currentTime += timestampIncrement; + timeCounter = 0; + } + long newMillis = Math.round(currentTime); + if (newMillis > endTime) { + return endTime; + } else { + return newMillis; + } + } + +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemaInfo.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemaInfo.java new file mode 100644 index 00000000000..0a81f7ee990 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemaInfo.java @@ -0,0 +1,65 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.datagen; + +import io.druid.benchmark.datagen.BenchmarkColumnSchema; +import io.druid.query.aggregation.AggregatorFactory; +import org.joda.time.Interval; + +import java.util.List; + +public class BenchmarkSchemaInfo +{ + private List columnSchemas; + private List aggs; + private Interval dataInterval; + + public BenchmarkSchemaInfo ( + List columnSchemas, + List aggs, + Interval dataInterval + ) + { + this.columnSchemas = columnSchemas; + this.aggs = aggs; + this.dataInterval = dataInterval; + } + + public List getColumnSchemas() + { + return columnSchemas; + } + + public List getAggs() + { + return aggs; + } + + public AggregatorFactory[] getAggsArray() + { + return aggs.toArray(new AggregatorFactory[0]); + } + + public Interval getDataInterval() + { + return dataInterval; + } + +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java new file mode 100644 index 00000000000..4222199a264 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java @@ -0,0 +1,96 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.datagen; + +import com.google.common.collect.ImmutableList; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.segment.column.ValueType; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class BenchmarkSchemas +{ + public static final Map SCHEMA_MAP = new LinkedHashMap<>(); + + static { // basic schema + List basicSchemaColumns = ImmutableList.of( + // dims + BenchmarkColumnSchema.makeSequential("dimSequential", ValueType.STRING, false, 1, null, 0, 1000), + BenchmarkColumnSchema.makeZipf("dimZipf", ValueType.STRING, false, 1, null, 1, 101, 1.0), + BenchmarkColumnSchema.makeDiscreteUniform("dimUniform", ValueType.STRING, false, 1, null, 1, 1000000), + BenchmarkColumnSchema.makeSequential("dimSequentialHalfNull", ValueType.STRING, false, 1, 0.5, 0, 1000), + BenchmarkColumnSchema.makeEnumerated( + "dimMultivalEnumerated", + ValueType.STRING, + false, + 4, + null, + Arrays.asList("Hello", "World", "Foo", "Bar", "Baz"), + Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3) + ), + BenchmarkColumnSchema.makeEnumerated( + "dimMultivalEnumerated2", + ValueType.STRING, + false, + 3, + null, + Arrays.asList("Apple", "Orange", "Xylophone", "Corundum", null), + Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3) + ), + BenchmarkColumnSchema.makeSequential("dimMultivalSequentialWithNulls", ValueType.STRING, false, 8, 0.15, 1, 11), + BenchmarkColumnSchema.makeSequential("dimHyperUnique", ValueType.STRING, false, 1, null, 0, 100000), + BenchmarkColumnSchema.makeSequential("dimNull", ValueType.STRING, false, 1, 1.0, 0, 1), + + // metrics + BenchmarkColumnSchema.makeSequential("metLongSequential", ValueType.LONG, true, 1, null, 0, 10000), + BenchmarkColumnSchema.makeDiscreteUniform("metLongUniform", ValueType.LONG, true, 1, null, 0, 500), + BenchmarkColumnSchema.makeNormal("metFloatNormal", ValueType.FLOAT, true, 1, null, 5000.0, 1.0, true), + BenchmarkColumnSchema.makeZipf("metFloatZipf", ValueType.FLOAT, true, 1, null, 0, 1000, 1.0) + ); + + List basicSchemaIngestAggs = new ArrayList<>(); + basicSchemaIngestAggs.add(new CountAggregatorFactory("rows")); + basicSchemaIngestAggs.add(new LongSumAggregatorFactory("sumLongSequential", "metLongSequential")); + basicSchemaIngestAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform")); + basicSchemaIngestAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "metFloatNormal")); + basicSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf")); + basicSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique")); + + Interval basicSchemaDataInterval = new Interval(0, 1000000); + + BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo( + basicSchemaColumns, + basicSchemaIngestAggs, + basicSchemaDataInterval + ); + SCHEMA_MAP.put("basic", basicSchema); + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/EnumeratedTreeDistribution.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/EnumeratedTreeDistribution.java new file mode 100644 index 00000000000..5cab5af7ee7 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/EnumeratedTreeDistribution.java @@ -0,0 +1,61 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.datagen; + +import org.apache.commons.math3.distribution.EnumeratedDistribution; +import org.apache.commons.math3.util.Pair; + +import java.util.List; +import java.util.TreeMap; + +/* + * EnumeratedDistrubtion's sample() method does a linear scan through the array of probabilities. + * + * This is too slow with high cardinality value sets, so this subclass overrides sample() to use + * a TreeMap instead. + */ +public class EnumeratedTreeDistribution extends EnumeratedDistribution +{ + private TreeMap probabilityRanges; + private List> normalizedPmf; + + public EnumeratedTreeDistribution(final List> pmf) + { + super(pmf); + + // build the interval tree + probabilityRanges = new TreeMap(); + normalizedPmf = this.getPmf(); + double cumulativep = 0.0; + for (int i = 0; i < normalizedPmf.size(); i++) { + probabilityRanges.put(cumulativep, i); + Pair pair = normalizedPmf.get(i); + cumulativep += pair.getSecond(); + } + } + + @Override + public T sample() + { + final double randomValue = random.nextDouble(); + Integer valueIndex = probabilityRanges.floorEntry(randomValue).getValue(); + return normalizedPmf.get(valueIndex).getFirst(); + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/RealRoundingDistribution.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/RealRoundingDistribution.java new file mode 100644 index 00000000000..31409211ac4 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/RealRoundingDistribution.java @@ -0,0 +1,97 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.datagen; + +import org.apache.commons.math3.distribution.AbstractIntegerDistribution; +import org.apache.commons.math3.distribution.AbstractRealDistribution; +import org.apache.commons.math3.distribution.EnumeratedDistribution; +import org.apache.commons.math3.util.Pair; + +import java.util.List; +import java.util.TreeMap; + +/* + * Rounds the output values from the sample() function of an AbstractRealDistribution. + */ +public class RealRoundingDistribution extends AbstractIntegerDistribution +{ + private AbstractRealDistribution realDist; + + public RealRoundingDistribution(AbstractRealDistribution realDist) + { + this.realDist = realDist; + } + + @Override + public double probability(int x) + { + return 0; + } + + @Override + public double cumulativeProbability(int x) + { + return 0; + } + + @Override + public double getNumericalMean() + { + return 0; + } + + @Override + public double getNumericalVariance() + { + return 0; + } + + @Override + public int getSupportLowerBound() + { + return 0; + } + + @Override + public int getSupportUpperBound() + { + return 0; + } + + @Override + public boolean isSupportConnected() + { + return false; + } + + @Override + public void reseedRandomGenerator(long seed) + { + realDist.reseedRandomGenerator(seed); + } + + @Override + public int sample() + { + double randomVal = realDist.sample(); + Long longVal = Math.round(randomVal); + return longVal.intValue(); + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/SequentialDistribution.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/SequentialDistribution.java new file mode 100644 index 00000000000..7782edf0263 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/SequentialDistribution.java @@ -0,0 +1,69 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.datagen; + +import org.apache.commons.math3.distribution.EnumeratedDistribution; +import org.apache.commons.math3.util.Pair; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.TreeMap; + +public class SequentialDistribution extends EnumeratedDistribution +{ + + private Integer start; + private Integer end; + private List enumeratedValues; + private int counter; + + + public SequentialDistribution(Integer start, Integer end, List enumeratedValues) + { + // just pass in some bogus probability mass function, we won't be using it + super(Arrays.asList(new Pair(null, 1.0))); + this.start = start; + this.end = end; + this.enumeratedValues = enumeratedValues; + if (enumeratedValues == null) { + counter = start; + } else { + counter = 0; + } + } + + @Override + public Object sample() + { + Object ret; + if (enumeratedValues != null) { + ret = enumeratedValues.get(counter); + counter = (counter + 1) % enumeratedValues.size(); + } else { + ret = counter; + counter++; + if (counter >= end) { + counter = start; + } + } + return ret; + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java new file mode 100644 index 00000000000..e64775809f1 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java @@ -0,0 +1,148 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.indexing; + +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionSelector; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(jvmArgsPrepend = "-server", value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class IncrementalIndexReadBenchmark +{ + @Param({"750000"}) + private int rowsPerSegment; + + @Param({"basic"}) + private String schema; + + private static final Logger log = new Logger(IncrementalIndexReadBenchmark.class); + private static final int RNG_SEED = 9999; + private IncrementalIndex incIndex; + + private BenchmarkSchemaInfo schemaInfo; + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + +System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128())); + } + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); + + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(schemaInfo.getAggsArray()) + .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .build(), + true, + false, + true, + rowsPerSegment + ); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void read(Blackhole blackhole) throws Exception + { + IncrementalIndexStorageAdapter sa = new IncrementalIndexStorageAdapter(incIndex); + Sequence cursors = sa.makeCursors(null, schemaInfo.getDataInterval(), QueryGranularities.ALL, false); + Cursor cursor = Sequences.toList(Sequences.limit(cursors, 1), Lists.newArrayList()).get(0); + + List selectors = new ArrayList<>(); + selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null))); + selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimZipf", null))); + selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimUniform", null))); + selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequentialHalfNull", null))); + + cursor.reset(); + while (!cursor.isDone()) { + for (DimensionSelector selector : selectors) { + IndexedInts row = selector.getRow(); + blackhole.consume(selector.lookupName(row.get(0))); + } + cursor.advance(); + } + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java new file mode 100644 index 00000000000..5756d5ebf0e --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java @@ -0,0 +1,129 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.indexing; + +import com.google.common.hash.Hashing; +import com.metamx.common.logger.Logger; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.granularity.QueryGranularities; + +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(jvmArgsPrepend = "-server", value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class IndexIngestionBenchmark +{ + @Param({"75000"}) + private int rowsPerSegment; + + @Param({"basic"}) + private String schema; + + private static final Logger log = new Logger(IndexIngestionBenchmark.class); + private static final int RNG_SEED = 9999; + + private IncrementalIndex incIndex; + private ArrayList rows; + private BenchmarkSchemaInfo schemaInfo; + + @Setup + public void setup() throws IOException + { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128())); + + rows = new ArrayList(); + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); + + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = gen.nextRow(); + if (i % 10000 == 0) { + log.info(i + " rows generated."); + } + rows.add(row); + } + } + + @Setup(Level.Iteration) + public void setup2() throws IOException + { + incIndex = makeIncIndex(); + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(schemaInfo.getAggsArray()) + .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .build(), + true, + false, + true, + rowsPerSegment + ); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void addRows(Blackhole blackhole) throws Exception + { + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = rows.get(i); + int rv = incIndex.add(row); + blackhole.consume(rv); + } + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java new file mode 100644 index 00000000000..462b39882a5 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java @@ -0,0 +1,201 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import com.metamx.common.logger.Logger; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(jvmArgsPrepend = "-server", value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class IndexMergeBenchmark +{ + @Param({"5"}) + private int numSegments; + + @Param({"75000"}) + private int rowsPerSegment; + + @Param({"basic"}) + private String schema; + + private static final Logger log = new Logger(IndexMergeBenchmark.class); + private static final int RNG_SEED = 9999; + private static final IndexMerger INDEX_MERGER; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + private List indexesToMerge; + private BenchmarkSchemaInfo schemaInfo; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO); + } + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + + System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128())); + } + + indexesToMerge = new ArrayList<>(); + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); + + for (int i = 0; i < numSegments; i++) { + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + i, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + + File tmpFile = Files.createTempDir(); + log.info("Using temp dir: " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + + File indexFile = INDEX_MERGER_V9.persist( + incIndex, + tmpFile, + new IndexSpec() + ); + + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + indexesToMerge.add(qIndex); + } + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(schemaInfo.getAggsArray()) + .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .build(), + true, + false, + true, + rowsPerSegment + ); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void merge(Blackhole blackhole) throws Exception + { + File tmpFile = File.createTempFile("IndexMergeBenchmark-MERGEDFILE-" + System.currentTimeMillis(), ".TEMPFILE"); + tmpFile.delete(); + tmpFile.mkdirs(); + log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory()); + tmpFile.deleteOnExit(); + + File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec()); + + blackhole.consume(mergedFile); + + tmpFile.delete(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void mergeV9(Blackhole blackhole) throws Exception + { + File tmpFile = File.createTempFile("IndexMergeBenchmark-MERGEDFILE-V9-" + System.currentTimeMillis(), ".TEMPFILE"); + tmpFile.delete(); + tmpFile.mkdirs(); + log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory()); + tmpFile.deleteOnExit(); + + File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec()); + + blackhole.consume(mergedFile); + + tmpFile.delete(); + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java new file mode 100644 index 00000000000..e79e492772f --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java @@ -0,0 +1,206 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import com.metamx.common.logger.Logger; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(jvmArgsPrepend = "-server", value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class IndexPersistBenchmark +{ + @Param({"75000"}) + private int rowsPerSegment; + + @Param({"basic"}) + private String schema; + + private static final Logger log = new Logger(IndexPersistBenchmark.class); + private static final int RNG_SEED = 9999; + + private IncrementalIndex incIndex; + private ArrayList rows; + private BenchmarkSchemaInfo schemaInfo; + + + private static final IndexMerger INDEX_MERGER; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO); + } + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + + System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128())); + } + + rows = new ArrayList(); + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); + + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = gen.nextRow(); + if (i % 10000 == 0) { + log.info(i + " rows generated."); + } + rows.add(row); + } + + + } + + @Setup(Level.Iteration) + public void setup2() throws IOException + { + incIndex = makeIncIndex(); + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = rows.get(i); + incIndex.add(row); + } + } + + @TearDown(Level.Iteration) + public void teardown() throws IOException + { + incIndex.close(); + incIndex = null; + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(schemaInfo.getAggsArray()) + .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .build(), + true, + false, + true, + rowsPerSegment + ); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void persist(Blackhole blackhole) throws Exception + { + File tmpFile = Files.createTempDir(); + log.info("Using temp dir: " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + + File indexFile = INDEX_MERGER.persist( + incIndex, + tmpFile, + new IndexSpec() + ); + + blackhole.consume(indexFile); + + tmpFile.delete(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void persistV9(Blackhole blackhole) throws Exception + { + File tmpFile = Files.createTempDir(); + log.info("Using temp dir: " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit();; + + File indexFile = INDEX_MERGER_V9.persist( + incIndex, + tmpFile, + new IndexSpec() + ); + + blackhole.consume(indexFile); + + tmpFile.delete(); + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java new file mode 100644 index 00000000000..693653c5f4e --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -0,0 +1,358 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.concurrent.Execs; +import io.druid.data.input.InputRow; +import io.druid.data.input.Row; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.offheap.OffheapBufferPool; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryEngine; +import io.druid.query.groupby.GroupByQueryQueryToolChest; +import io.druid.query.groupby.GroupByQueryRunnerFactory; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(jvmArgsPrepend = "-server", value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class GroupByBenchmark +{ + @Param({"4"}) + private int numSegments; + + @Param({"100000"}) + private int rowsPerSegment; + + @Param({"basic.A"}) + private String schemaAndQuery; + + private static final Logger log = new Logger(GroupByBenchmark.class); + private static final int RNG_SEED = 9999; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + private List incIndexes; + private List qIndexes; + + private QueryRunnerFactory factory; + + private BenchmarkSchemaInfo schemaInfo; + private GroupByQuery query; + + private ExecutorService executorService; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO); + } + + private static final Map> SCHEMA_QUERY_MAP = new LinkedHashMap<>(); + + private void setupQueries() + { + // queries for the basic schema + Map basicQueries = new LinkedHashMap<>(); + BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic"); + + { // basic.A + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval())); + List queryAggs = new ArrayList<>(); + queryAggs.add(new LongSumAggregatorFactory( + "sumLongSequential", + "sumLongSequential" + )); + GroupByQuery queryA = GroupByQuery + .builder() + .setDataSource("blah") + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimSequential", null), + new DefaultDimensionSpec("dimZipf", null) + //new DefaultDimensionSpec("dimUniform", null), + //new DefaultDimensionSpec("dimSequentialHalfNull", null) + //new DefaultDimensionSpec("dimMultivalEnumerated", null), //multival dims greatly increase the running time, disable for now + //new DefaultDimensionSpec("dimMultivalEnumerated2", null) + )) + .setAggregatorSpecs( + queryAggs + ) + .setGranularity(QueryGranularities.DAY) + .build(); + + basicQueries.put("A", queryA); + } + + SCHEMA_QUERY_MAP.put("basic", basicQueries); + } + + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + +System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128())); + } + executorService = Execs.multiThreaded(numSegments, "GroupByThreadPool"); + + setupQueries(); + + String[] schemaQuery = schemaAndQuery.split("\\."); + String schemaName = schemaQuery[0]; + String queryName = schemaQuery[1]; + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName); + query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); + + incIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + log.info("Generating rows for segment " + i); + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + 1, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + incIndexes.add(incIndex); + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + log.info(rowsPerSegment + " rows generated"); + + } + + IncrementalIndex incIndex = incIndexes.get(0); + + File tmpFile = Files.createTempDir(); + log.info("Using temp dir: " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + + qIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + File indexFile = INDEX_MERGER_V9.persist( + incIndex, + tmpFile, + new IndexSpec() + ); + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + qIndexes.add(qIndex); + } + + OffheapBufferPool bufferPool = new OffheapBufferPool(250000000, Integer.MAX_VALUE); + OffheapBufferPool bufferPool2 = new OffheapBufferPool(250000000, Integer.MAX_VALUE); + final GroupByQueryConfig config = new GroupByQueryConfig(); + config.setSingleThreaded(false); + config.setMaxIntermediateRows(1000000); + config.setMaxResults(1000000); + + final Supplier configSupplier = Suppliers.ofInstance(config); + final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, bufferPool); + + factory = new GroupByQueryRunnerFactory( + engine, + QueryBenchmarkUtil.NOOP_QUERYWATCHER, + configSupplier, + new GroupByQueryQueryToolChest( + configSupplier, JSON_MAPPER, engine, bufferPool2, + QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() + ), + bufferPool2 + ); + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(schemaInfo.getAggsArray()) + .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .build(), + true, + false, + true, + rowsPerSegment + ); + } + + private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) + { + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)), + toolChest + ); + + Sequence queryResult = theRunner.run(query, Maps.newHashMap()); + return Sequences.toList(queryResult, Lists.newArrayList()); + } + + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleIncrementalIndex(Blackhole blackhole) throws Exception + { + QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + "incIndex", + new IncrementalIndexSegment(incIndexes.get(0), "incIndex") + ); + + List results = GroupByBenchmark.runQuery(factory, runner, query); + + for (Row result : results) { + blackhole.consume(result); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleQueryableIndex(Blackhole blackhole) throws Exception + { + QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + "qIndex", + new QueryableIndexSegment("qIndex", qIndexes.get(0)) + ); + + List results = GroupByBenchmark.runQuery(factory, runner, query); + + for (Row result : results) { + blackhole.consume(result); + } + } + + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception + { + List> singleSegmentRunners = Lists.newArrayList(); + QueryToolChest toolChest = factory.getToolchest(); + for (int i = 0; i < numSegments; i++) { + String segmentName = "qIndex" + i; + QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + segmentName, + new QueryableIndexSegment(segmentName, qIndexes.get(i)) + ); + singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner)); + } + + QueryRunner theRunner = toolChest.postMergeQueryDecoration( + new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)), + toolChest + ) + ); + + Sequence queryResult = theRunner.run(query, Maps.newHashMap()); + List results = Sequences.toList(queryResult, Lists.newArrayList()); + + for (Row result : results) { + blackhole.consume(result); + } + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java b/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java new file mode 100644 index 00000000000..feb754638eb --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java @@ -0,0 +1,78 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.query; + +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.guava.Sequence; +import io.druid.query.BySegmentQueryRunner; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; +import io.druid.segment.Segment; + +import java.util.Map; + +public class QueryBenchmarkUtil +{ + public static > QueryRunner makeQueryRunner( + QueryRunnerFactory factory, + String segmentId, + Segment adapter + ) + { + return new FinalizeResultsQueryRunner( + new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) + ), + (QueryToolChest>)factory.getToolchest() + ); + } + + public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() + { + return new IntervalChunkingQueryRunnerDecorator(null, null, null) { + @Override + public QueryRunner decorate(final QueryRunner delegate, + QueryToolChest> toolChest) { + return new QueryRunner() { + @Override + public Sequence run(Query query, Map responseContext) + { + return delegate.run(query, responseContext); + } + }; + } + }; + } + + public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java new file mode 100644 index 00000000000..89564a38be4 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java @@ -0,0 +1,342 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.concurrent.Execs; +import io.druid.data.input.InputRow; +import io.druid.data.input.Row; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.query.Result; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.query.search.SearchQueryQueryToolChest; +import io.druid.query.search.SearchQueryRunnerFactory; +import io.druid.query.search.SearchResultValue; +import io.druid.query.search.search.SearchHit; +import io.druid.query.search.search.SearchQuery; +import io.druid.query.search.search.SearchQueryConfig; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(jvmArgsPrepend = "-server", value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class SearchBenchmark +{ + @Param({"1"}) + private int numSegments; + + @Param({"750000"}) + private int rowsPerSegment; + + @Param({"basic.A"}) + private String schemaAndQuery; + + @Param({"1000"}) + private int limit; + + private static final Logger log = new Logger(SearchBenchmark.class); + private static final int RNG_SEED = 9999; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + private List incIndexes; + private List qIndexes; + + private QueryRunnerFactory factory; + private BenchmarkSchemaInfo schemaInfo; + private Druids.SearchQueryBuilder queryBuilder; + private SearchQuery query; + + private ExecutorService executorService; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO); + } + + private static final Map> SCHEMA_QUERY_MAP = new LinkedHashMap<>(); + + private void setupQueries() + { + // queries for the basic schema + Map basicQueries = new LinkedHashMap<>(); + BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic"); + + { // basic.A + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval())); + + List queryAggs = new ArrayList<>(); + queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")); + queryAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "maxLongUniform")); + queryAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal")); + queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf")); + queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper")); + + Druids.SearchQueryBuilder queryBuilderA = + Druids.newSearchQueryBuilder() + .dataSource("blah") + .granularity(QueryGranularities.ALL) + .intervals(intervalSpec) + .query("123"); + + basicQueries.put("A", queryBuilderA); + } + + SCHEMA_QUERY_MAP.put("basic", basicQueries); + } + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + +System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128())); + } + executorService = Execs.multiThreaded(numSegments, "SearchThreadPool"); + + setupQueries(); + + String[] schemaQuery = schemaAndQuery.split("\\."); + String schemaName = schemaQuery[0]; + String queryName = schemaQuery[1]; + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName); + queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); + queryBuilder.limit(limit); + query = queryBuilder.build(); + + incIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + log.info("Generating rows for segment " + i); + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + i, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + incIndexes.add(incIndex); + } + + File tmpFile = Files.createTempDir(); + log.info("Using temp dir: " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + + qIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + File indexFile = INDEX_MERGER_V9.persist( + incIndexes.get(i), + tmpFile, + new IndexSpec() + ); + + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + qIndexes.add(qIndex); + } + + factory = new SearchQueryRunnerFactory( + new SearchQueryQueryToolChest( + new SearchQueryConfig(), + QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() + ), + QueryBenchmarkUtil.NOOP_QUERYWATCHER + ); + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(schemaInfo.getAggsArray()) + .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .build(), + true, + false, + true, + rowsPerSegment + ); + } + + private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) + { + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)), + toolChest + ); + + Sequence queryResult = theRunner.run(query, Maps.newHashMap()); + return Sequences.toList(queryResult, Lists.newArrayList()); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleIncrementalIndex(Blackhole blackhole) throws Exception + { + QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + "incIndex", + new IncrementalIndexSegment(incIndexes.get(0), "incIndex") + ); + + List> results = SearchBenchmark.runQuery(factory, runner, query); + List hits = results.get(0).getValue().getValue(); + for (SearchHit hit : hits) { + blackhole.consume(hit); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleQueryableIndex(Blackhole blackhole) throws Exception + { + final QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + "qIndex", + new QueryableIndexSegment("qIndex", qIndexes.get(0)) + ); + + List> results = SearchBenchmark.runQuery(factory, runner, query); + List hits = results.get(0).getValue().getValue(); + for (SearchHit hit : hits) { + blackhole.consume(hit); + } + } + + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception + { + List> singleSegmentRunners = Lists.newArrayList(); + QueryToolChest toolChest = factory.getToolchest(); + for (int i = 0; i < numSegments; i++) { + String segmentName = "qIndex" + i; + final QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + segmentName, + new QueryableIndexSegment(segmentName, qIndexes.get(i)) + ); + singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner)); + } + + QueryRunner theRunner = toolChest.postMergeQueryDecoration( + new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)), + toolChest + ) + ); + + Sequence> queryResult = theRunner.run(query, Maps.newHashMap()); + List> results = Sequences.toList(queryResult, Lists.>newArrayList()); + + for (Result result : results) { + List hits = result.getValue().getValue(); + for (SearchHit hit : hits) { + blackhole.consume(hit); + } + } + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java new file mode 100644 index 00000000000..0ef0fbde397 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java @@ -0,0 +1,386 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.concurrent.Execs; +import io.druid.data.input.InputRow; +import io.druid.data.input.Row; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.query.Result; +import io.druid.query.TableDataSource; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.select.EventHolder; +import io.druid.query.select.PagingSpec; +import io.druid.query.select.SelectQuery; +import io.druid.query.select.SelectQueryEngine; +import io.druid.query.select.SelectQueryQueryToolChest; +import io.druid.query.select.SelectQueryRunnerFactory; +import io.druid.query.select.SelectResultValue; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(jvmArgsPrepend = "-server", value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class SelectBenchmark +{ + @Param({"1"}) + private int numSegments; + + @Param({"25000"}) + private int rowsPerSegment; + + @Param({"basic.A"}) + private String schemaAndQuery; + + @Param({"1000"}) + private int pagingThreshold; + + private static final Logger log = new Logger(SelectBenchmark.class); + private static final int RNG_SEED = 9999; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + private List incIndexes; + private List qIndexes; + + private QueryRunnerFactory factory; + + private BenchmarkSchemaInfo schemaInfo; + private Druids.SelectQueryBuilder queryBuilder; + private SelectQuery query; + + private ExecutorService executorService; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO); + } + + private static final Map> SCHEMA_QUERY_MAP = new LinkedHashMap<>(); + + private void setupQueries() + { + // queries for the basic schema + Map basicQueries = new LinkedHashMap<>(); + BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic"); + + { // basic.A + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval())); + + Druids.SelectQueryBuilder queryBuilderA = + Druids.newSelectQueryBuilder() + .dataSource(new TableDataSource("blah")) + .dimensionSpecs(DefaultDimensionSpec.toSpec(Arrays.asList())) + .metrics(Arrays.asList()) + .intervals(intervalSpec) + .granularity(QueryGranularities.ALL) + .descending(false); + + basicQueries.put("A", queryBuilderA); + } + + SCHEMA_QUERY_MAP.put("basic", basicQueries); + } + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128())); + } + + executorService = Execs.multiThreaded(numSegments, "SelectThreadPool"); + + setupQueries(); + + String[] schemaQuery = schemaAndQuery.split("\\."); + String schemaName = schemaQuery[0]; + String queryName = schemaQuery[1]; + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName); + queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); + queryBuilder.pagingSpec(PagingSpec.newSpec(pagingThreshold)); + query = queryBuilder.build(); + + incIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + i, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + incIndexes.add(incIndex); + } + + File tmpFile = Files.createTempDir(); + log.info("Using temp dir: " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + + qIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + File indexFile = INDEX_MERGER_V9.persist( + incIndexes.get(i), + tmpFile, + new IndexSpec() + ); + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + qIndexes.add(qIndex); + } + + factory = new SelectQueryRunnerFactory( + new SelectQueryQueryToolChest( + JSON_MAPPER, + QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() + ), + new SelectQueryEngine(), + QueryBenchmarkUtil.NOOP_QUERYWATCHER + ); + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(schemaInfo.getAggsArray()) + .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .build(), + true, + false, + true, + rowsPerSegment + ); + } + + private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) + { + + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)), + toolChest + ); + + Sequence queryResult = theRunner.run(query, Maps.newHashMap()); + return Sequences.toList(queryResult, Lists.newArrayList()); + } + + // don't run this benchmark with a query that doesn't use QueryGranularity.ALL, + // this pagination function probably doesn't work correctly in that case. + private SelectQuery incrementQueryPagination(SelectQuery query, SelectResultValue prevResult) + { + Map pagingIdentifiers = prevResult.getPagingIdentifiers(); + Map newPagingIdentifers = new HashMap<>(); + + for (String segmentId : pagingIdentifiers.keySet()) { + int newOffset = pagingIdentifiers.get(segmentId) + 1; + newPagingIdentifers.put(segmentId, newOffset); + } + + return query.withPagingSpec(new PagingSpec(newPagingIdentifers, pagingThreshold)); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryIncrementalIndex(Blackhole blackhole) throws Exception + { + SelectQuery queryCopy = query.withPagingSpec(PagingSpec.newSpec(pagingThreshold)); + + String segmentId = "incIndex"; + QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + segmentId, + new IncrementalIndexSegment(incIndexes.get(0), segmentId) + ); + + boolean done = false; + while (!done) { + List> results = SelectBenchmark.runQuery(factory, runner, queryCopy); + SelectResultValue result = results.get(0).getValue(); + if (result.getEvents().size() == 0) { + done = true; + } else { + for (EventHolder eh : result.getEvents()) { + blackhole.consume(eh); + } + queryCopy = incrementQueryPagination(queryCopy, result); + } + } + } + + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryQueryableIndex(Blackhole blackhole) throws Exception + { + SelectQuery queryCopy = query.withPagingSpec(PagingSpec.newSpec(pagingThreshold)); + + String segmentId = "qIndex"; + QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + segmentId, + new QueryableIndexSegment(segmentId, qIndexes.get(0)) + ); + + boolean done = false; + while (!done) { + List> results = SelectBenchmark.runQuery(factory, runner, queryCopy); + SelectResultValue result = results.get(0).getValue(); + if (result.getEvents().size() == 0) { + done = true; + } else { + for (EventHolder eh : result.getEvents()) { + blackhole.consume(eh); + } + queryCopy = incrementQueryPagination(queryCopy, result); + } + } + } + + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception + { + SelectQuery queryCopy = query.withPagingSpec(PagingSpec.newSpec(pagingThreshold)); + + String segmentName; + List>> singleSegmentRunners = Lists.newArrayList(); + QueryToolChest toolChest = factory.getToolchest(); + for (int i = 0; i < numSegments; i++) { + segmentName = "qIndex" + i; + QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + segmentName, + new QueryableIndexSegment(segmentName, qIndexes.get(i)) + ); + singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner)); + } + + QueryRunner theRunner = toolChest.postMergeQueryDecoration( + new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)), + toolChest + ) + ); + + + boolean done = false; + while (!done) { + Sequence> queryResult = theRunner.run(queryCopy, Maps.newHashMap()); + List> results = Sequences.toList(queryResult, Lists.>newArrayList()); + + SelectResultValue result = results.get(0).getValue(); + + if (result.getEvents().size() == 0) { + done = true; + } else { + for (EventHolder eh : result.getEvents()) { + blackhole.consume(eh); + } + queryCopy = incrementQueryPagination(queryCopy, result); + } + } + } +} \ No newline at end of file diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java new file mode 100644 index 00000000000..051a2370c77 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java @@ -0,0 +1,354 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.concurrent.Execs; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.query.Result; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.query.filter.DimFilter; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(jvmArgsPrepend = "-server", value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class TimeseriesBenchmark +{ + @Param({"1"}) + private int numSegments; + + @Param({"750000"}) + private int rowsPerSegment; + + @Param({"basic.A"}) + private String schemaAndQuery; + + private static final Logger log = new Logger(TimeseriesBenchmark.class); + private static final int RNG_SEED = 9999; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + private List incIndexes; + private List qIndexes; + + private QueryRunnerFactory factory; + private BenchmarkSchemaInfo schemaInfo; + private TimeseriesQuery query; + + private ExecutorService executorService; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO); + } + + private static final Map> SCHEMA_QUERY_MAP = new LinkedHashMap<>(); + + private void setupQueries() + { + // queries for the basic schema + Map basicQueries = new LinkedHashMap<>(); + BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic"); + + { // basic.A + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval())); + + List queryAggs = new ArrayList<>(); + queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")); + queryAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "maxLongUniform")); + queryAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal")); + queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf")); + queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper")); + + TimeseriesQuery queryA = + Druids.newTimeseriesQueryBuilder() + .dataSource("blah") + .granularity(QueryGranularities.ALL) + .intervals(intervalSpec) + .aggregators(queryAggs) + .descending(false) + .build(); + + basicQueries.put("A", queryA); + } + + SCHEMA_QUERY_MAP.put("basic", basicQueries); + } + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128())); + } + + executorService = Execs.multiThreaded(numSegments, "TimeseriesThreadPool"); + + setupQueries(); + + String[] schemaQuery = schemaAndQuery.split("\\."); + String schemaName = schemaQuery[0]; + String queryName = schemaQuery[1]; + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName); + query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); + + incIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + log.info("Generating rows for segment " + i); + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + i, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + log.info(rowsPerSegment + " rows generated"); + incIndexes.add(incIndex); + } + + File tmpFile = Files.createTempDir(); + log.info("Using temp dir: " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + + qIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + File indexFile = INDEX_MERGER_V9.persist( + incIndexes.get(i), + tmpFile, + new IndexSpec() + ); + + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + qIndexes.add(qIndex); + } + + factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest( + QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() + ), + new TimeseriesQueryEngine(), + QueryBenchmarkUtil.NOOP_QUERYWATCHER + ); + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(schemaInfo.getAggsArray()) + .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .build(), + true, + false, + true, + rowsPerSegment + ); + } + + private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) + { + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)), + toolChest + ); + + Sequence queryResult = theRunner.run(query, Maps.newHashMap()); + return Sequences.toList(queryResult, Lists.newArrayList()); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleIncrementalIndex(Blackhole blackhole) throws Exception + { + QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + "incIndex", + new IncrementalIndexSegment(incIndexes.get(0), "incIndex") + ); + + List> results = TimeseriesBenchmark.runQuery(factory, runner, query); + for (Result result : results) { + blackhole.consume(result); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleQueryableIndex(Blackhole blackhole) throws Exception + { + final QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + "qIndex", + new QueryableIndexSegment("qIndex", qIndexes.get(0)) + ); + + List> results = TimeseriesBenchmark.runQuery(factory, runner, query); + for (Result result : results) { + blackhole.consume(result); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryFilteredSingleQueryableIndex(Blackhole blackhole) throws Exception + { + final QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + "qIndex", + new QueryableIndexSegment("qIndex", qIndexes.get(0)) + ); + + DimFilter filter = new SelectorDimFilter("dimSequential", "399", null); + Query filteredQuery = query.withDimFilter(filter); + + List> results = TimeseriesBenchmark.runQuery(factory, runner, filteredQuery); + for (Result result : results) { + blackhole.consume(result); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception + { + List>> singleSegmentRunners = Lists.newArrayList(); + QueryToolChest toolChest = factory.getToolchest(); + for (int i = 0; i < numSegments; i++) { + String segmentName = "qIndex" + i; + QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + segmentName, + new QueryableIndexSegment(segmentName, qIndexes.get(i)) + ); + singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner)); + } + + QueryRunner theRunner = toolChest.postMergeQueryDecoration( + new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)), + toolChest + ) + ); + + Sequence> queryResult = theRunner.run(query, Maps.newHashMap()); + List> results = Sequences.toList(queryResult, Lists.>newArrayList()); + + for (Result result : results) { + blackhole.consume(result); + } + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java new file mode 100644 index 00000000000..12646f6e867 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java @@ -0,0 +1,338 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.concurrent.Execs; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.offheap.OffheapBufferPool; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.query.Result; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import io.druid.query.topn.TopNQuery; +import io.druid.query.topn.TopNQueryBuilder; +import io.druid.query.topn.TopNQueryConfig; +import io.druid.query.topn.TopNQueryQueryToolChest; +import io.druid.query.topn.TopNQueryRunnerFactory; +import io.druid.query.topn.TopNResultValue; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(jvmArgsPrepend = "-server", value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class TopNBenchmark +{ + @Param({"1"}) + private int numSegments; + + @Param({"750000"}) + private int rowsPerSegment; + + @Param({"basic.A"}) + private String schemaAndQuery; + + @Param({"10"}) + private int threshold; + + private static final Logger log = new Logger(TopNBenchmark.class); + private static final int RNG_SEED = 9999; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + private List incIndexes; + private List qIndexes; + + private QueryRunnerFactory factory; + private BenchmarkSchemaInfo schemaInfo; + private TopNQueryBuilder queryBuilder; + private TopNQuery query; + + private ExecutorService executorService; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO); + } + + private static final Map> SCHEMA_QUERY_MAP = new LinkedHashMap<>(); + + private void setupQueries() + { + // queries for the basic schema + Map basicQueries = new LinkedHashMap<>(); + BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic"); + + { // basic.A + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval())); + + List queryAggs = new ArrayList<>(); + queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")); + queryAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "maxLongUniform")); + queryAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal")); + queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf")); + queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper")); + + TopNQueryBuilder queryBuilderA = new TopNQueryBuilder() + .dataSource("blah") + .granularity(QueryGranularities.ALL) + .dimension("dimSequential") + .metric("sumFloatNormal") + .intervals(intervalSpec) + .aggregators(queryAggs); + + basicQueries.put("A", queryBuilderA); + } + + SCHEMA_QUERY_MAP.put("basic", basicQueries); + } + + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128())); + } + + executorService = Execs.multiThreaded(numSegments, "TopNThreadPool"); + + setupQueries(); + + String[] schemaQuery = schemaAndQuery.split("\\."); + String schemaName = schemaQuery[0]; + String queryName = schemaQuery[1]; + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName); + queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); + queryBuilder.threshold(threshold); + query = queryBuilder.build(); + + incIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + log.info("Generating rows for segment " + i); + + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + i, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + incIndexes.add(incIndex); + } + + File tmpFile = Files.createTempDir(); + log.info("Using temp dir: " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + + qIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + File indexFile = INDEX_MERGER_V9.persist( + incIndexes.get(i), + tmpFile, + new IndexSpec() + ); + + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + qIndexes.add(qIndex); + } + + factory = new TopNQueryRunnerFactory( + new OffheapBufferPool(250000000, Integer.MAX_VALUE), + new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()), + QueryBenchmarkUtil.NOOP_QUERYWATCHER + ); + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(schemaInfo.getAggsArray()) + .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .build(), + true, + false, + true, + rowsPerSegment + ); + } + + private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) + { + + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)), + toolChest + ); + + Sequence queryResult = theRunner.run(query, Maps.newHashMap()); + return Sequences.toList(queryResult, Lists.newArrayList()); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleIncrementalIndex(Blackhole blackhole) throws Exception + { + QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + "incIndex", + new IncrementalIndexSegment(incIndexes.get(0), "incIndex") + ); + + List> results = TopNBenchmark.runQuery(factory, runner, query); + for (Result result : results) { + blackhole.consume(result); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleQueryableIndex(Blackhole blackhole) throws Exception + { + final QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + "qIndex", + new QueryableIndexSegment("qIndex", qIndexes.get(0)) + ); + + List> results = TopNBenchmark.runQuery(factory, runner, query); + for (Result result : results) { + blackhole.consume(result); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception + { + List>> singleSegmentRunners = Lists.newArrayList(); + QueryToolChest toolChest = factory.getToolchest(); + for (int i = 0; i < numSegments; i++) { + String segmentName = "qIndex" + i; + QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + segmentName, + new QueryableIndexSegment(segmentName, qIndexes.get(i)) + ); + singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner)); + } + + QueryRunner theRunner = toolChest.postMergeQueryDecoration( + new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)), + toolChest + ) + ); + + Sequence> queryResult = theRunner.run(query, Maps.newHashMap()); + List> results = Sequences.toList(queryResult, Lists.>newArrayList()); + + for (Result result : results) { + blackhole.consume(result); + } + } +} diff --git a/benchmarks/src/test/java/io/druid/benchmark/BenchmarkDataGeneratorTest.java b/benchmarks/src/test/java/io/druid/benchmark/BenchmarkDataGeneratorTest.java new file mode 100644 index 00000000000..a3733939c90 --- /dev/null +++ b/benchmarks/src/test/java/io/druid/benchmark/BenchmarkDataGeneratorTest.java @@ -0,0 +1,444 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark; + +import io.druid.benchmark.datagen.BenchmarkColumnSchema; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.data.input.InputRow; +import io.druid.segment.column.ValueType; +import org.joda.time.Interval; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +// Doesn't assert behavior right now, just generates rows and prints out some distribution numbers +public class BenchmarkDataGeneratorTest +{ + @Test + public void testSequential() throws Exception { + List schemas = new ArrayList<>(); + RowValueTracker tracker = new RowValueTracker(); + + schemas.add( + BenchmarkColumnSchema.makeSequential( + "dimA", + ValueType.STRING, + false, + 1, + null, + 10, + 20 + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeEnumeratedSequential( + "dimB", + ValueType.STRING, + false, + 1, + null, + Arrays.asList("Hello", "World", "Foo", "Bar") + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeSequential( + "dimC", + ValueType.STRING, + false, + 1, + 0.50, + 30, + 40 + ) + ); + + BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0); + for (int i = 0; i < 100; i++) { + InputRow row = dataGenerator.nextRow(); + //System.out.println("S-ROW: " + row); + tracker.addRow(row); + } + tracker.printStuff(); + } + + @Test + public void testDiscreteUniform() throws Exception { + List schemas = new ArrayList<>(); + RowValueTracker tracker = new RowValueTracker(); + + schemas.add( + BenchmarkColumnSchema.makeDiscreteUniform( + "dimA", + ValueType.STRING, + false, + 1, + null, + 10, + 20 + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeEnumeratedDiscreteUniform( + "dimB", + ValueType.STRING, + false, + 4, + null, + Arrays.asList("Hello", "World", "Foo", "Bar") + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeDiscreteUniform( + "dimC", + ValueType.STRING, + false, + 1, + 0.50, + 10, + 20 + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeDiscreteUniform( + "dimD", + ValueType.FLOAT, + false, + 1, + null, + 100, + 120 + ) + ); + + BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0); + for (int i = 0; i < 100; i++) { + InputRow row = dataGenerator.nextRow(); + //System.out.println("U-ROW: " + row); + + tracker.addRow(row); + } + + tracker.printStuff(); + } + + + @Test + public void testRoundedNormal() throws Exception { + List schemas = new ArrayList<>(); + RowValueTracker tracker = new RowValueTracker(); + + schemas.add( + BenchmarkColumnSchema.makeNormal( + "dimA", + ValueType.FLOAT, + false, + 1, + null, + 50.0, + 1.0, + true + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeNormal( + "dimB", + ValueType.STRING, + false, + 1, + null, + 1000.0, + 10.0, + true + ) + ); + + BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0); + for (int i = 0; i < 1000000; i++) { + InputRow row = dataGenerator.nextRow(); + //System.out.println("N-ROW: " + row); + + tracker.addRow(row); + } + + tracker.printStuff(); + } + + @Test + public void testZipf() throws Exception { + List schemas = new ArrayList<>(); + RowValueTracker tracker = new RowValueTracker(); + + schemas.add( + BenchmarkColumnSchema.makeZipf( + "dimA", + ValueType.STRING, + false, + 1, + null, + 1000, + 2000, + 1.0 + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeZipf( + "dimB", + ValueType.FLOAT, + false, + 1, + null, + 99990, + 99999, + 1.0 + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeEnumeratedZipf( + "dimC", + ValueType.STRING, + false, + 1, + null, + Arrays.asList("1-Hello", "2-World", "3-Foo", "4-Bar", "5-BA5EBA11", "6-Rocky", "7-Mango", "8-Contango"), + 1.0 + ) + ); + + BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0); + for (int i = 0; i < 100; i++) { + InputRow row = dataGenerator.nextRow(); + //System.out.println("Z-ROW: " + row); + + tracker.addRow(row); + } + + tracker.printStuff(); + } + + @Test + public void testEnumerated() throws Exception { + List schemas = new ArrayList<>(); + RowValueTracker tracker = new RowValueTracker(); + + schemas.add( + BenchmarkColumnSchema.makeEnumerated( + "dimA", + ValueType.STRING, + false, + 1, + null, + Arrays.asList("Hello", "World", "Foo", "Bar"), + Arrays.asList(0.5, 0.25, 0.15, 0.10) + ) + ); + + BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0); + for (int i = 0; i < 10000; i++) { + InputRow row = dataGenerator.nextRow(); + //System.out.println("Z-ROW: " + row); + + tracker.addRow(row); + } + + tracker.printStuff(); + } + + @Test + public void testNormal() throws Exception { + List schemas = new ArrayList<>(); + RowValueTracker tracker = new RowValueTracker(); + + schemas.add( + BenchmarkColumnSchema.makeNormal( + "dimA", + ValueType.FLOAT, + false, + 1, + null, + 8.0, + 1.0, + false + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeNormal( + "dimB", + ValueType.STRING, + false, + 1, + 0.50, + 88.0, + 2.0, + false + ) + ); + + BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0); + for (int i = 0; i < 100; i++) { + InputRow row = dataGenerator.nextRow(); + //System.out.println("N-ROW: " + row); + + tracker.addRow(row); + } + + tracker.printStuff(); + } + + @Test + public void testRealUniform() throws Exception { + List schemas = new ArrayList<>(); + RowValueTracker tracker = new RowValueTracker(); + + schemas.add( + BenchmarkColumnSchema.makeContinuousUniform( + "dimA", + ValueType.STRING, + false, + 1, + null, + 10.0, + 50.0 + ) + ); + + schemas.add( + BenchmarkColumnSchema.makeContinuousUniform( + "dimB", + ValueType.FLOAT, + false, + 1, + null, + 210.0, + 250.0 + ) + ); + + BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, 0, 0, 1000.0); + for (int i = 0; i < 100; i++) { + InputRow row = dataGenerator.nextRow(); + //System.out.println("U-ROW: " + row); + + tracker.addRow(row); + } + + tracker.printStuff(); + } + + @Test + public void testIntervalBasedTimeGeneration() throws Exception { + List schemas = new ArrayList<>(); + + schemas.add( + BenchmarkColumnSchema.makeEnumeratedSequential( + "dimB", + ValueType.STRING, + false, + 1, + null, + Arrays.asList("Hello", "World", "Foo", "Bar") + ) + ); + + BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(schemas, 9999, new Interval(50000, 600000), 100); + for (int i = 0; i < 100; i++) { + InputRow row = dataGenerator.nextRow(); + //System.out.println("S-ROW: " + row); + } + + BenchmarkDataGenerator dataGenerator2 = new BenchmarkDataGenerator(schemas, 9999, new Interval(50000, 50001), 100); + for (int i = 0; i < 100; i++) { + InputRow row = dataGenerator2.nextRow(); + //System.out.println("S2-ROW: " + row); + } + } + + + private class RowValueTracker + { + private Map> dimensionMap; + + public RowValueTracker() { + dimensionMap = new HashMap<>(); + } + + public void addRow(InputRow row) { + for (String dim : row.getDimensions()) { + if (dimensionMap.get(dim) == null) { + dimensionMap.put(dim, new HashMap()); + } + + Map valueMap = dimensionMap.get(dim); + Object dimVals = row.getRaw(dim); + if (dimVals == null) { + dimVals = Collections.singletonList(null); + } else if (!(dimVals instanceof List)) { + dimVals = Collections.singletonList(dimVals); + } + List dimValsList = (List) dimVals; + + for (Object val : dimValsList) { + if (val == null) { + val = ""; + } + if (valueMap.get(val) == null) { + valueMap.put(val, 0); + } + valueMap.put(val, valueMap.get(val) + 1); + } + } + } + + + public void printStuff() + { + System.out.println(); + for (String dim : dimensionMap.keySet()) { + System.out.println("DIMENSION " + dim + "\n============"); + Map valueMap = dimensionMap.get(dim); + + List valList = new ArrayList<>(); + for (Object val : valueMap.keySet()) { + valList.add((Comparable) val); + } + + Collections.sort(valList); + + for (Comparable val : valList) { + System.out.println(" VAL: " + val.toString() + " CNT: " + valueMap.get(val)); + } + System.out.println(); + } + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java index 3828750c1ff..830e41ebb60 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java @@ -39,6 +39,11 @@ public class GroupByQueryConfig return singleThreaded; } + public void setSingleThreaded(boolean singleThreaded) + { + this.singleThreaded = singleThreaded; + } + public int getMaxIntermediateRows() { return maxIntermediateRows; @@ -53,4 +58,9 @@ public class GroupByQueryConfig { return maxResults; } + + public void setMaxResults(int maxResults) + { + this.maxResults = maxResults; + } }