From 10e57d5f9ed003e032c82240045125002903a5bb Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 1 Feb 2019 14:04:13 -0800 Subject: [PATCH 1/3] Moved Scan Builder to Druids class and started on Scan Benchmark setup --- .../druid/benchmark/query/ScanBenchmark.java | 371 ++++++++++++++++++ .../benchmark/query/SelectBenchmark.java | 1 - .../java/org/apache/druid/query/Druids.java | 157 ++++++++ .../apache/druid/query/scan/ScanQuery.java | 171 +------- .../apache/druid/query/DoubleStorageTest.java | 14 +- .../query/scan/MultiSegmentScanQueryTest.java | 5 +- .../druid/query/scan/ScanQueryRunnerTest.java | 5 +- .../sql/calcite/BaseCalciteQueryTest.java | 5 +- 8 files changed, 550 insertions(+), 179 deletions(-) create mode 100644 benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java new file mode 100644 index 00000000000..4085467a3a9 --- /dev/null +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator; +import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; +import org.apache.druid.benchmark.datagen.BenchmarkSchemas; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.hll.HyperLogLogHash; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.Druids; +import org.apache.druid.query.FinalizeResultsQueryRunner; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import org.apache.druid.query.extraction.DimExtractionFn; +import org.apache.druid.query.extraction.IdentityExtractionFn; +import org.apache.druid.query.extraction.LowerExtractionFn; +import org.apache.druid.query.extraction.StrlenExtractionFn; +import org.apache.druid.query.extraction.SubstringDimExtractionFn; +import org.apache.druid.query.extraction.UpperExtractionFn; +import org.apache.druid.query.filter.AndDimFilter; +import org.apache.druid.query.filter.BoundDimFilter; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.search.SearchQueryConfig; +import org.apache.druid.query.search.SearchQueryQueryToolChest; +import org.apache.druid.query.search.SearchQueryRunnerFactory; +import org.apache.druid.query.search.SearchStrategySelector; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.serde.ComplexMetrics; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class ScanBenchmark +{ + @Param({"1"}) + private int numSegments; + + @Param({"750000"}) + private int rowsPerSegment; + + @Param({"basic.A"}) + private String schemaAndQuery; + + @Param({"1000"}) + private int limit; + + private static final Logger log = new Logger(ScanBenchmark.class); + private static final ObjectMapper JSON_MAPPER; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + + private List incIndexes; + private List qIndexes; + + private QueryRunnerFactory factory; + private BenchmarkSchemaInfo schemaInfo; + private Druids.ScanQueryBuilder queryBuilder; + private ScanQuery query; + private File tmpDir; + + private ExecutorService executorService; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + INDEX_IO = new IndexIO( + JSON_MAPPER, + () -> 0 + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + } + + private static final Map> SCHEMA_QUERY_MAP = new LinkedHashMap<>(); + + private void setupQueries() + { + // queries for the basic schema + final Map basicQueries = new LinkedHashMap<>(); + final BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic"); + + final List queryTypes = ImmutableList.of("A", "B", "C", "D"); + for (final String eachType : queryTypes) { + basicQueries.put(eachType, makeQuery(eachType, basicSchema)); + } + + SCHEMA_QUERY_MAP.put("basic", basicQueries); + } + + private static Druids.ScanQueryBuilder makeQuery(final String name, final BenchmarkSchemaInfo basicSchema) + { + switch (name) { + case "A": + return basicA(basicSchema); + case "B": + return basicB(basicSchema); + case "C": + return basicC(basicSchema); + case "D": + return basicD(basicSchema); + default: + return null; + } + } + + private static Druids.ScanQueryBuilder basicA(final BenchmarkSchemaInfo basicSchema) + { + final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval())); + + return Druids.newScanQueryBuilder() + .dataSource("blah") + .intervals(intervalSpec) + .query("123"); + } + + private static Druids.ScanQueryBuilder basicB(final BenchmarkSchemaInfo basicSchema) + { + final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval())); + + final List dimUniformFilterVals = new ArrayList<>(); + int resultNum = (int) (100000 * 0.1); + int step = 100000 / resultNum; + for (int i = 1; i < 100001 && dimUniformFilterVals.size() < resultNum; i += step) { + dimUniformFilterVals.add(String.valueOf(i)); + } + + List dimHyperUniqueFilterVals = new ArrayList<>(); + resultNum = (int) (100000 * 0.1); + step = 100000 / resultNum; + for (int i = 0; i < 100001 && dimHyperUniqueFilterVals.size() < resultNum; i += step) { + dimHyperUniqueFilterVals.add(String.valueOf(i)); + } + + final List dimFilters = new ArrayList<>(); + dimFilters.add(new InDimFilter("dimUniform", dimUniformFilterVals, null)); + dimFilters.add(new InDimFilter("dimHyperUnique", dimHyperUniqueFilterVals, null)); + + return Druids.newScanQueryBuilder(); // TODO + } + + private static Druids.ScanQueryBuilder basicC(final BenchmarkSchemaInfo basicSchema) + { + final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval())); + + final List dimUniformFilterVals = new ArrayList<>(); + final int resultNum = (int) (100000 * 0.1); + final int step = 100000 / resultNum; + for (int i = 1; i < 100001 && dimUniformFilterVals.size() < resultNum; i += step) { + dimUniformFilterVals.add(String.valueOf(i)); + } + + final String dimName = "dimUniform"; + final List dimFilters = new ArrayList<>(); + dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, IdentityExtractionFn.getInstance())); + dimFilters.add(new SelectorDimFilter(dimName, "3", StrlenExtractionFn.instance())); + dimFilters.add(new BoundDimFilter(dimName, "100", "10000", true, true, true, new DimExtractionFn() + { + @Override + public byte[] getCacheKey() + { + return new byte[]{0xF}; + } + + @Override + public String apply(String value) + { + return String.valueOf(Long.parseLong(value) + 1); + } + + @Override + public boolean preservesOrdering() + { + return false; + } + + @Override + public ExtractionType getExtractionType() + { + return ExtractionType.ONE_TO_ONE; + } + }, null)); + dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, new LowerExtractionFn(null))); + dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, new UpperExtractionFn(null))); + dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, new SubstringDimExtractionFn(1, 3))); + + return Druids.newScanQueryBuilder(); // TODO + } + + private static Druids.ScanQueryBuilder basicD(final BenchmarkSchemaInfo basicSchema) + { + final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(basicSchema.getDataInterval()) + ); + + final List dimUniformFilterVals = new ArrayList<>(); + final int resultNum = (int) (100000 * 0.1); + final int step = 100000 / resultNum; + for (int i = 1; i < 100001 && dimUniformFilterVals.size() < resultNum; i += step) { + dimUniformFilterVals.add(String.valueOf(i)); + } + + final String dimName = "dimUniform"; + final List dimFilters = new ArrayList<>(); + dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, null)); + dimFilters.add(new SelectorDimFilter(dimName, "3", null)); + dimFilters.add(new BoundDimFilter(dimName, "100", "10000", true, true, true, null, null)); + + return Druids.newScanQueryBuilder(); // TODO + } + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + +System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault())); + } + executorService = Execs.multiThreaded(numSegments, "SearchThreadPool"); + + setupQueries(); + + String[] schemaQuery = schemaAndQuery.split("\\."); + String schemaName = schemaQuery[0]; + String queryName = schemaQuery[1]; + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName); + queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); + queryBuilder.limit(limit); + query = queryBuilder.build(); + + incIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + log.info("Generating rows for segment " + i); + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + System.currentTimeMillis(), + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + incIndexes.add(incIndex); + } + + tmpDir = Files.createTempDir(); + log.info("Using temp dir: " + tmpDir.getAbsolutePath()); + + qIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + File indexFile = INDEX_MERGER_V9.persist( + incIndexes.get(i), + tmpDir, + new IndexSpec(), + null + ); + + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + qIndexes.add(qIndex); + } + + final ScanQueryConfig config = new ScanQueryConfig().setLegacy(false); + factory = new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + config, + DefaultGenericQueryMetricsFactory.instance() + ), + new ScanQueryEngine() + ); + } + + @TearDown + public void tearDown() throws IOException + { + FileUtils.deleteDirectory(tmpDir); + } + + private IncrementalIndex makeIncIndex() + { + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); + } + + private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) + { + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)), + toolChest + ); + + Sequence queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>()); + return queryResult.toList(); + } +} diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SelectBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SelectBenchmark.java index bd212646566..dbc69421548 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SelectBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SelectBenchmark.java @@ -315,7 +315,6 @@ public class SelectBenchmark } } - @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) diff --git a/processing/src/main/java/org/apache/druid/query/Druids.java b/processing/src/main/java/org/apache/druid/query/Druids.java index 2c2a6537b85..6ef7f93afb6 100644 --- a/processing/src/main/java/org/apache/druid/query/Druids.java +++ b/processing/src/main/java/org/apache/druid/query/Druids.java @@ -36,6 +36,7 @@ import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.metadata.metadata.ColumnIncluderator; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; +import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.search.ContainsSearchQuerySpec; import org.apache.druid.query.search.FragmentSearchQuerySpec; import org.apache.druid.query.search.InsensitiveContainsSearchQuerySpec; @@ -896,6 +897,162 @@ public class Druids return new SelectQueryBuilder(); } + /** + * A Builder for ScanQuery. + *

+ * Required: dataSource(), intervals() must be called before build() + *

+ * Usage example: + *


+   *   ScanQuery query = new ScanQueryBuilder()
+   *                                  .dataSource("Example")
+   *                                  .interval("2010/2013")
+   *                                  .build();
+   * 
+ * + * @see ScanQuery + */ + public static class ScanQueryBuilder + { + private DataSource dataSource; + private QuerySegmentSpec querySegmentSpec; + private VirtualColumns virtualColumns; + private Map context; + private String resultFormat; + private int batchSize; + private long limit; + private DimFilter dimFilter; + private List columns; + private Boolean legacy; + + public ScanQueryBuilder() + { + dataSource = null; + querySegmentSpec = null; + virtualColumns = null; + context = null; + resultFormat = null; + batchSize = 0; + limit = 0; + dimFilter = null; + columns = new ArrayList<>(); + legacy = null; + } + + public ScanQuery build() + { + return new ScanQuery( + dataSource, + querySegmentSpec, + virtualColumns, + resultFormat, + batchSize, + limit, + dimFilter, + columns, + legacy, + context + ); + } + + public static ScanQueryBuilder copy(ScanQuery query) + { + return new ScanQueryBuilder() + .dataSource(query.getDataSource()) + .intervals(query.getQuerySegmentSpec()) + .virtualColumns(query.getVirtualColumns()) + .resultFormat(query.getResultFormat()) + .batchSize(query.getBatchSize()) + .limit(query.getLimit()) + .filters(query.getFilter()) + .columns(query.getColumns()) + .legacy(query.isLegacy()) + .context(query.getContext()); + } + + public ScanQueryBuilder dataSource(String ds) + { + dataSource = new TableDataSource(ds); + return this; + } + + public ScanQueryBuilder dataSource(DataSource ds) + { + dataSource = ds; + return this; + } + + public ScanQueryBuilder intervals(QuerySegmentSpec q) + { + querySegmentSpec = q; + return this; + } + + public ScanQueryBuilder virtualColumns(VirtualColumns virtualColumns) + { + this.virtualColumns = virtualColumns; + return this; + } + + public ScanQueryBuilder virtualColumns(VirtualColumn... virtualColumns) + { + return virtualColumns(VirtualColumns.create(Arrays.asList(virtualColumns))); + } + + public ScanQueryBuilder context(Map c) + { + context = c; + return this; + } + + public ScanQueryBuilder resultFormat(String r) + { + resultFormat = r; + return this; + } + + public ScanQueryBuilder batchSize(int b) + { + batchSize = b; + return this; + } + + public ScanQueryBuilder limit(long l) + { + limit = l; + return this; + } + + public ScanQueryBuilder filters(DimFilter f) + { + dimFilter = f; + return this; + } + + public ScanQueryBuilder columns(List c) + { + columns = c; + return this; + } + + public ScanQueryBuilder columns(String... c) + { + columns = Arrays.asList(c); + return this; + } + + public ScanQueryBuilder legacy(Boolean legacy) + { + this.legacy = legacy; + return this; + } + } + + public static ScanQueryBuilder newScanQueryBuilder() + { + return new ScanQueryBuilder(); + } + /** * A Builder for DataSourceMetadataQuery. *

diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index e780d36e10b..c6d1583b59c 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -24,15 +24,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; +import org.apache.druid.query.Druids; import org.apache.druid.query.Query; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.spec.QuerySegmentSpec; -import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -137,30 +134,30 @@ public class ScanQuery extends BaseQuery public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig) { - return ScanQueryBuilder.copy(this).legacy(legacy != null ? legacy : scanQueryConfig.isLegacy()).build(); + return Druids.ScanQueryBuilder.copy(this).legacy(legacy != null ? legacy : scanQueryConfig.isLegacy()).build(); } @Override public Query withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) { - return ScanQueryBuilder.copy(this).intervals(querySegmentSpec).build(); + return Druids.ScanQueryBuilder.copy(this).intervals(querySegmentSpec).build(); } @Override public Query withDataSource(DataSource dataSource) { - return ScanQueryBuilder.copy(this).dataSource(dataSource).build(); + return Druids.ScanQueryBuilder.copy(this).dataSource(dataSource).build(); } @Override public Query withOverriddenContext(Map contextOverrides) { - return ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build(); + return Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build(); } public ScanQuery withDimFilter(DimFilter dimFilter) { - return ScanQueryBuilder.copy(this).filters(dimFilter).build(); + return Druids.ScanQueryBuilder.copy(this).filters(dimFilter).build(); } @Override @@ -206,160 +203,4 @@ public class ScanQuery extends BaseQuery ", legacy=" + legacy + '}'; } - - /** - * A Builder for ScanQuery. - *

- * Required: dataSource(), intervals() must be called before build() - *

- * Usage example: - *


-   *   ScanQuery query = new ScanQueryBuilder()
-   *                                  .dataSource("Example")
-   *                                  .interval("2010/2013")
-   *                                  .build();
-   * 
- * - * @see ScanQuery - */ - public static class ScanQueryBuilder - { - private DataSource dataSource; - private QuerySegmentSpec querySegmentSpec; - private VirtualColumns virtualColumns; - private Map context; - private String resultFormat; - private int batchSize; - private long limit; - private DimFilter dimFilter; - private List columns; - private Boolean legacy; - - public ScanQueryBuilder() - { - dataSource = null; - querySegmentSpec = null; - virtualColumns = null; - context = null; - resultFormat = null; - batchSize = 0; - limit = 0; - dimFilter = null; - columns = new ArrayList<>(); - legacy = null; - } - - public ScanQuery build() - { - return new ScanQuery( - dataSource, - querySegmentSpec, - virtualColumns, - resultFormat, - batchSize, - limit, - dimFilter, - columns, - legacy, - context - ); - } - - public static ScanQueryBuilder copy(ScanQuery query) - { - return new ScanQueryBuilder() - .dataSource(query.getDataSource()) - .intervals(query.getQuerySegmentSpec()) - .virtualColumns(query.getVirtualColumns()) - .resultFormat(query.getResultFormat()) - .batchSize(query.getBatchSize()) - .limit(query.getLimit()) - .filters(query.getFilter()) - .columns(query.getColumns()) - .legacy(query.isLegacy()) - .context(query.getContext()); - } - - public ScanQueryBuilder dataSource(String ds) - { - dataSource = new TableDataSource(ds); - return this; - } - - public ScanQueryBuilder dataSource(DataSource ds) - { - dataSource = ds; - return this; - } - - public ScanQueryBuilder intervals(QuerySegmentSpec q) - { - querySegmentSpec = q; - return this; - } - - public ScanQueryBuilder virtualColumns(VirtualColumns virtualColumns) - { - this.virtualColumns = virtualColumns; - return this; - } - - public ScanQueryBuilder virtualColumns(VirtualColumn... virtualColumns) - { - return virtualColumns(VirtualColumns.create(Arrays.asList(virtualColumns))); - } - - public ScanQueryBuilder context(Map c) - { - context = c; - return this; - } - - public ScanQueryBuilder resultFormat(String r) - { - resultFormat = r; - return this; - } - - public ScanQueryBuilder batchSize(int b) - { - batchSize = b; - return this; - } - - public ScanQueryBuilder limit(long l) - { - limit = l; - return this; - } - - public ScanQueryBuilder filters(DimFilter f) - { - dimFilter = f; - return this; - } - - public ScanQueryBuilder columns(List c) - { - columns = c; - return this; - } - - public ScanQueryBuilder columns(String... c) - { - columns = Arrays.asList(c); - return this; - } - - public ScanQueryBuilder legacy(Boolean legacy) - { - this.legacy = legacy; - return this; - } - } - - public static ScanQueryBuilder newScanQueryBuilder() - { - return new ScanQueryBuilder(); - } } diff --git a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java index e6f0b5db0c5..a85b41d6116 100644 --- a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java +++ b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java @@ -97,14 +97,14 @@ public class DoubleStorageTest new ScanQueryEngine() ); - private ScanQuery.ScanQueryBuilder newTestQuery() + private Druids.ScanQueryBuilder newTestQuery() { - return ScanQuery.newScanQueryBuilder() - .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) - .columns(Collections.emptyList()) - .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) - .limit(Integer.MAX_VALUE) - .legacy(false); + return Druids.newScanQueryBuilder() + .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) + .columns(Collections.emptyList()) + .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) + .limit(Integer.MAX_VALUE) + .legacy(false); } diff --git a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java index e4f837ba1c5..e138d73a1fe 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.guava.MergeSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.Druids; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -177,9 +178,9 @@ public class MultiSegmentScanQueryTest this.batchSize = batchSize; } - private ScanQuery.ScanQueryBuilder newBuilder() + private Druids.ScanQueryBuilder newBuilder() { - return ScanQuery.newScanQueryBuilder() + return Druids.newScanQueryBuilder() .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) .intervals(SelectQueryRunnerTest.I_0112_0114_SPEC) .batchSize(batchSize) diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index faeea4f8380..ec18bdbcdc6 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.Druids; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerTestHelper; @@ -136,9 +137,9 @@ public class ScanQueryRunnerTest this.legacy = legacy; } - private ScanQuery.ScanQueryBuilder newTestQuery() + private Druids.ScanQueryBuilder newTestQuery() { - return ScanQuery.newScanQueryBuilder() + return Druids.newScanQueryBuilder() .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) .columns(Collections.emptyList()) .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index b5d083100c7..0d25a92e8c8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -359,9 +360,9 @@ public class BaseCalciteQueryTest extends CalciteTestBase return new ExpressionPostAggregator(name, expression, null, CalciteTests.createExprMacroTable()); } - public static ScanQuery.ScanQueryBuilder newScanQueryBuilder() + public static Druids.ScanQueryBuilder newScanQueryBuilder() { - return new ScanQuery.ScanQueryBuilder().resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) .legacy(false); } From dd4ec1ac9c1194144e3ec98b811adc59598c8d8c Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 1 Feb 2019 15:12:17 -0800 Subject: [PATCH 2/3] Need to form queries --- .../druid/benchmark/query/ScanBenchmark.java | 90 ++++++++++++++++--- 1 file changed, 79 insertions(+), 11 deletions(-) diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java index 4085467a3a9..e177065d3bb 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java @@ -20,19 +20,17 @@ package org.apache.druid.benchmark.query; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator; import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; import org.apache.druid.benchmark.datagen.BenchmarkSchemas; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.Row; import org.apache.druid.hll.HyperLogLogHash; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; @@ -43,6 +41,7 @@ import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import org.apache.druid.query.extraction.DimExtractionFn; import org.apache.druid.query.extraction.IdentityExtractionFn; @@ -50,7 +49,6 @@ import org.apache.druid.query.extraction.LowerExtractionFn; import org.apache.druid.query.extraction.StrlenExtractionFn; import org.apache.druid.query.extraction.SubstringDimExtractionFn; import org.apache.druid.query.extraction.UpperExtractionFn; -import org.apache.druid.query.filter.AndDimFilter; import org.apache.druid.query.filter.BoundDimFilter; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.InDimFilter; @@ -60,28 +58,32 @@ import org.apache.druid.query.scan.ScanQueryConfig; import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanQueryQueryToolChest; import org.apache.druid.query.scan.ScanQueryRunnerFactory; -import org.apache.druid.query.search.SearchQueryConfig; -import org.apache.druid.query.search.SearchQueryQueryToolChest; -import org.apache.druid.query.search.SearchQueryRunnerFactory; -import org.apache.druid.query.search.SearchStrategySelector; +import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.timeline.SegmentId; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; import java.io.File; import java.io.IOException; @@ -92,6 +94,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) @Fork(value = 1) @@ -99,9 +102,12 @@ import java.util.concurrent.ExecutorService; @Measurement(iterations = 25) public class ScanBenchmark { - @Param({"1"}) + @Param({"1", "4"}) private int numSegments; + @Param({"1", "2"}) + private int numProcessingThreads; + @Param({"750000"}) private int rowsPerSegment; @@ -281,7 +287,7 @@ public class ScanBenchmark if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault())); } - executorService = Execs.multiThreaded(numSegments, "SearchThreadPool"); + executorService = Execs.multiThreaded(numProcessingThreads, "ScanThreadPool"); setupQueries(); @@ -368,4 +374,66 @@ public class ScanBenchmark Sequence queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>()); return queryResult.toList(); } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleIncrementalIndex(Blackhole blackhole) + { + QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + SegmentId.dummy("incIndex"), + new IncrementalIndexSegment(incIndexes.get(0), SegmentId.dummy("incIndex")) + ); + + List results = ScanBenchmark.runQuery(factory, runner, query); + blackhole.consume(results); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void querySingleQueryableIndex(Blackhole blackhole) + { + final QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + SegmentId.dummy("qIndex"), + new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex")) + ); + + List results = ScanBenchmark.runQuery(factory, runner, query); + blackhole.consume(results); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndex(Blackhole blackhole) + { + List> runners = new ArrayList<>(); + QueryToolChest toolChest = factory.getToolchest(); + for (int i = 0; i < numSegments; i++) { + String segmentName = "qIndex" + i; + final QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + SegmentId.dummy(segmentName), + new QueryableIndexSegment(qIndexes.get(i), SegmentId.dummy(segmentName)) + ); + runners.add(toolChest.preMergeQueryDecoration(runner)); + } + + QueryRunner theRunner = toolChest.postMergeQueryDecoration( + new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(factory.mergeRunners(executorService, runners)), + toolChest + ) + ); + + Sequence> queryResult = theRunner.run( + QueryPlus.wrap(query), + new HashMap<>() + ); + List> results = queryResult.toList(); + blackhole.consume(results); + } } From 26930f8d2021d1d62322c54e0ec35e260137ab1d Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 1 Feb 2019 16:38:49 -0800 Subject: [PATCH 3/3] It runs. --- .../druid/benchmark/query/ScanBenchmark.java | 71 +++++-------------- 1 file changed, 18 insertions(+), 53 deletions(-) diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java index e177065d3bb..4c66e837356 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java @@ -43,12 +43,7 @@ import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; -import org.apache.druid.query.extraction.DimExtractionFn; -import org.apache.druid.query.extraction.IdentityExtractionFn; -import org.apache.druid.query.extraction.LowerExtractionFn; import org.apache.druid.query.extraction.StrlenExtractionFn; -import org.apache.druid.query.extraction.SubstringDimExtractionFn; -import org.apache.druid.query.extraction.UpperExtractionFn; import org.apache.druid.query.filter.BoundDimFilter; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.InDimFilter; @@ -96,10 +91,11 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +/* Works with 4GB heap size or greater. Otherwise there's a good chance of an OOME. */ @State(Scope.Benchmark) @Fork(value = 1) -@Warmup(iterations = 10) -@Measurement(iterations = 25) +@Warmup(iterations = 5) +@Measurement(iterations = 5) public class ScanBenchmark { @Param({"1", "4"}) @@ -108,7 +104,7 @@ public class ScanBenchmark @Param({"1", "2"}) private int numProcessingThreads; - @Param({"750000"}) + @Param({"250000"}) private int rowsPerSegment; @Param({"basic.A"}) @@ -174,14 +170,15 @@ public class ScanBenchmark } } + /* Just get everything */ private static Druids.ScanQueryBuilder basicA(final BenchmarkSchemaInfo basicSchema) { final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval())); return Druids.newScanQueryBuilder() .dataSource("blah") - .intervals(intervalSpec) - .query("123"); + .intervals(intervalSpec); + } private static Druids.ScanQueryBuilder basicB(final BenchmarkSchemaInfo basicSchema) @@ -202,11 +199,11 @@ public class ScanBenchmark dimHyperUniqueFilterVals.add(String.valueOf(i)); } - final List dimFilters = new ArrayList<>(); - dimFilters.add(new InDimFilter("dimUniform", dimUniformFilterVals, null)); - dimFilters.add(new InDimFilter("dimHyperUnique", dimHyperUniqueFilterVals, null)); + DimFilter filter = new InDimFilter("dimHyperUnique", dimHyperUniqueFilterVals, null); - return Druids.newScanQueryBuilder(); // TODO + return Druids.newScanQueryBuilder() + .filters(filter) + .intervals(intervalSpec); } private static Druids.ScanQueryBuilder basicC(final BenchmarkSchemaInfo basicSchema) @@ -221,40 +218,9 @@ public class ScanBenchmark } final String dimName = "dimUniform"; - final List dimFilters = new ArrayList<>(); - dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, IdentityExtractionFn.getInstance())); - dimFilters.add(new SelectorDimFilter(dimName, "3", StrlenExtractionFn.instance())); - dimFilters.add(new BoundDimFilter(dimName, "100", "10000", true, true, true, new DimExtractionFn() - { - @Override - public byte[] getCacheKey() - { - return new byte[]{0xF}; - } - - @Override - public String apply(String value) - { - return String.valueOf(Long.parseLong(value) + 1); - } - - @Override - public boolean preservesOrdering() - { - return false; - } - - @Override - public ExtractionType getExtractionType() - { - return ExtractionType.ONE_TO_ONE; - } - }, null)); - dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, new LowerExtractionFn(null))); - dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, new UpperExtractionFn(null))); - dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, new SubstringDimExtractionFn(1, 3))); - - return Druids.newScanQueryBuilder(); // TODO + return Druids.newScanQueryBuilder() + .filters(new SelectorDimFilter(dimName, "3", StrlenExtractionFn.instance())) + .intervals(intervalSpec); // TODO } private static Druids.ScanQueryBuilder basicD(final BenchmarkSchemaInfo basicSchema) @@ -271,12 +237,11 @@ public class ScanBenchmark } final String dimName = "dimUniform"; - final List dimFilters = new ArrayList<>(); - dimFilters.add(new InDimFilter(dimName, dimUniformFilterVals, null)); - dimFilters.add(new SelectorDimFilter(dimName, "3", null)); - dimFilters.add(new BoundDimFilter(dimName, "100", "10000", true, true, true, null, null)); - return Druids.newScanQueryBuilder(); // TODO + + return Druids.newScanQueryBuilder() + .filters(new BoundDimFilter(dimName, "100", "10000", true, true, true, null, null)) + .intervals(intervalSpec); // TODO } @Setup