From 79a54283d43e7e5200bbadbc9de2acc3d2323e2c Mon Sep 17 00:00:00 2001 From: Dave Li Date: Mon, 9 May 2016 14:04:06 -0400 Subject: [PATCH] Optimize filter for timeseries, search, and select queries (#2931) * Optimize filter for timeseries, search, and select queries * exception at failed toolchest type check * took out query type check * java7 error fix and test improvement --- .../io/druid/query/QueryRunnerFactory.java | 2 +- .../groupby/GroupByQueryQueryToolChest.java | 3 - .../search/SearchQueryQueryToolChest.java | 18 ++++- .../query/search/search/SearchQuery.java | 15 ++++ .../io/druid/query/select/SelectQuery.java | 15 ++++ .../select/SelectQueryQueryToolChest.java | 20 +++++- .../query/timeseries/TimeseriesQuery.java | 14 ++++ .../TimeseriesQueryQueryToolChest.java | 20 +++++- .../query/topn/TopNQueryQueryToolChest.java | 36 +++++----- .../query/search/SearchQueryRunnerTest.java | 69 +++++++++--------- .../query/select/SelectQueryRunnerTest.java | 65 +++++++++++++++-- .../timeseries/TimeseriesQueryRunnerTest.java | 71 +++++++++++++++++++ 12 files changed, 281 insertions(+), 67 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryRunnerFactory.java b/processing/src/main/java/io/druid/query/QueryRunnerFactory.java index 9dd8bb4973d..3eb2eae4e0a 100644 --- a/processing/src/main/java/io/druid/query/QueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/QueryRunnerFactory.java @@ -55,7 +55,7 @@ public interface QueryRunnerFactory> * * @param queryExecutor ExecutorService to be used for parallel processing * @param queryRunners Individual QueryRunner objects that produce some results - * @return a QueryRunner that, when asked, will use the ExecutorService to runt he base QueryRunners + * @return a QueryRunner that, when asked, will use the ExecutorService to run the base QueryRunners */ public QueryRunner mergeRunners(ExecutorService queryExecutor, Iterable> queryRunners); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index a3c77245a83..fa7bbc0b86e 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -424,9 +424,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest run(Query query, Map responseContext) { - if (!(query instanceof GroupByQuery)) { - return runner.run(query, responseContext); - } GroupByQuery groupByQuery = (GroupByQuery) query; if (groupByQuery.getDimFilter() != null){ groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize()); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index a196c085232..0c37d478e60 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -241,10 +241,24 @@ public class SearchQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) + public QueryRunner> preMergeQueryDecoration(final QueryRunner> runner) { return new SearchThresholdAdjustingQueryRunner( - intervalChunkingQueryRunnerDecorator.decorate(runner, this), + intervalChunkingQueryRunnerDecorator.decorate( + new QueryRunner>() + { + @Override + public Sequence> run( + Query> query, Map responseContext + ) + { + SearchQuery searchQuery = (SearchQuery) query; + if (searchQuery.getDimensionsFilter() != null) { + searchQuery = searchQuery.withDimFilter(searchQuery.getDimensionsFilter().optimize()); + } + return runner.run(searchQuery, responseContext); + } + } , this), config ); } diff --git a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java index 71314185d78..b724538ac4b 100644 --- a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java +++ b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java @@ -131,6 +131,21 @@ public class SearchQuery extends BaseQuery> ); } + public SearchQuery withDimFilter(DimFilter dimFilter) + { + return new SearchQuery( + getDataSource(), + dimFilter, + granularity, + limit, + getQuerySegmentSpec(), + dimensions, + querySpec, + sortSpec, + getContext() + ); + } + @JsonProperty("filter") public DimFilter getDimensionsFilter() { diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index e36bb210562..2e3e36304c7 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -188,6 +188,21 @@ public class SelectQuery extends BaseQuery> ); } + public SelectQuery withDimFilter(DimFilter dimFilter) + { + return new SelectQuery( + getDataSource(), + getQuerySegmentSpec(), + isDescending(), + dimFilter, + granularity, + dimensions, + metrics, + pagingSpec, + getContext() + ); + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 5438f35ba9b..3bf1bd4ad02 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -29,8 +29,10 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; +import com.metamx.common.ISE; import com.metamx.common.StringUtils; import com.metamx.common.guava.Comparators; +import com.metamx.common.guava.Sequence; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.granularity.QueryGranularity; @@ -261,9 +263,23 @@ public class SelectQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) + public QueryRunner> preMergeQueryDecoration(final QueryRunner> runner) { - return intervalChunkingQueryRunnerDecorator.decorate(runner, this); + return intervalChunkingQueryRunnerDecorator.decorate( + new QueryRunner>() + { + @Override + public Sequence> run( + Query> query, Map responseContext + ) + { + SelectQuery selectQuery = (SelectQuery) query; + if (selectQuery.getDimensionsFilter() != null) { + selectQuery = selectQuery.withDimFilter(selectQuery.getDimensionsFilter().optimize()); + } + return runner.run(selectQuery, responseContext); + } + }, this); } @Override diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index 72a41a7d125..c2c71e5d7a0 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -152,6 +152,20 @@ public class TimeseriesQuery extends BaseQuery> ); } + public TimeseriesQuery withDimFilter(DimFilter dimFilter) + { + return new TimeseriesQuery( + getDataSource(), + getQuerySegmentSpec(), + isDescending(), + dimFilter, + granularity, + aggregatorSpecs, + postAggregatorSpecs, + getContext() + ); + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index f82e493f023..bfa11b0210c 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -25,6 +25,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; +import com.metamx.common.ISE; +import com.metamx.common.guava.Sequence; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.granularity.QueryGranularity; @@ -210,9 +212,23 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) + public QueryRunner> preMergeQueryDecoration(final QueryRunner> runner) { - return intervalChunkingQueryRunnerDecorator.decorate(runner, this); + return intervalChunkingQueryRunnerDecorator.decorate( + new QueryRunner>() + { + @Override + public Sequence> run( + Query> query, Map responseContext + ) + { + TimeseriesQuery timeseriesQuery = (TimeseriesQuery) query; + if (timeseriesQuery.getDimensionsFilter() != null) { + timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize()); + } + return runner.run(timeseriesQuery, responseContext); + } + }, this); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 20ca60c6612..cfb2a5b2f20 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -418,27 +418,23 @@ public class TopNQueryQueryToolChest extends QueryToolChest> query, Map responseContext ) { - if (!(query instanceof TopNQuery)) { - return runner.run(query, responseContext); + TopNQuery topNQuery = (TopNQuery) query; + if (topNQuery.getDimensionsFilter() != null) { + topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize()); + } + final TopNQuery delegateTopNQuery = topNQuery; + if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) { + final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec(); + return runner.run( + delegateTopNQuery.withDimensionSpec( + new DefaultDimensionSpec( + dimensionSpec.getDimension(), + dimensionSpec.getOutputName() + ) + ), responseContext + ); } else { - TopNQuery topNQuery = (TopNQuery) query; - if (topNQuery.getDimensionsFilter() != null) { - topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize()); - } - final TopNQuery delegateTopNQuery = topNQuery; - if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) { - final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec(); - return runner.run( - delegateTopNQuery.withDimensionSpec( - new DefaultDimensionSpec( - dimensionSpec.getDimension(), - dimensionSpec.getOutputName() - ) - ), responseContext - ); - } else { - return runner.run(delegateTopNQuery, responseContext); - } + return runner.run(delegateTopNQuery, responseContext); } } } diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 0b968dd93b7..f3cf3284d0f 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -82,12 +82,15 @@ public class SearchQueryRunnerTest } private final QueryRunner runner; + private final QueryRunner decoratedRunner; public SearchQueryRunnerTest( QueryRunner runner ) { this.runner = runner; + this.decoratedRunner = toolChest.postMergeQueryDecoration( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner))); } @Test @@ -342,34 +345,33 @@ public class SearchQueryRunnerTest true, null, true, - false + true ); - checkSearchQuery( - Druids.newSearchQueryBuilder() - .dataSource(QueryRunnerTestHelper.dataSource) - .granularity(QueryRunnerTestHelper.allGran) - .filters( - new ExtractionDimFilter( - QueryRunnerTestHelper.qualityDimension, - automotiveSnowman, - lookupExtractionFn, - null - ) - ) - .intervals(QueryRunnerTestHelper.fullOnInterval) - .dimensions( - new ExtractionDimensionSpec( - QueryRunnerTestHelper.qualityDimension, - null, - lookupExtractionFn, - null - ) - ) - .query("☃") - .build(), - expectedHits - ); + SearchQuery query = Druids.newSearchQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .filters( + new ExtractionDimFilter( + QueryRunnerTestHelper.qualityDimension, + automotiveSnowman, + lookupExtractionFn, + null + ) + ) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .dimensions( + new ExtractionDimensionSpec( + QueryRunnerTestHelper.qualityDimension, + null, + lookupExtractionFn, + null + ) + ) + .query("☃") + .build(); + + checkSearchQuery(query, expectedHits); } @Test @@ -551,6 +553,7 @@ public class SearchQueryRunnerTest private void checkSearchQuery(Query searchQuery, List expectedResults) { checkSearchQuery(searchQuery, runner, expectedResults); + checkSearchQuery(searchQuery, decoratedRunner, expectedResults); } private void checkSearchQuery(Query searchQuery, QueryRunner runner, List expectedResults) @@ -559,31 +562,31 @@ public class SearchQueryRunnerTest runner.run(searchQuery, ImmutableMap.of()), Lists.>newArrayList() ); - List copy = ImmutableList.copyOf(expectedResults); + List copy = Lists.newLinkedList(expectedResults); for (Result result : results) { Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), result.getTimestamp()); Assert.assertTrue(result.getValue() instanceof Iterable); Iterable resultValues = result.getValue(); for (SearchHit resultValue : resultValues) { - int index = expectedResults.indexOf(resultValue); + int index = copy.indexOf(resultValue); if (index < 0) { fail( - copy, results, + expectedResults, results, "No result found containing " + resultValue.getDimension() + " and " + resultValue.getValue() ); } - SearchHit expected = expectedResults.remove(index); + SearchHit expected = copy.remove(index); if (!resultValue.toString().equals(expected.toString())) { fail( - copy, results, + expectedResults, results, "Invalid count for " + resultValue + ".. which was expected to be " + expected.getCount() ); } } } - if (!expectedResults.isEmpty()) { - fail(copy, results, "Some expected results are not shown: " + expectedResults); + if (!copy.isEmpty()) { + fail(expectedResults, results, "Some expected results are not shown: " + copy); } } diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java index 9f8ddd92b0e..a7bc0256380 100644 --- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java @@ -99,16 +99,18 @@ public class SelectQueryRunnerTest ); public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class); + private static final SelectQueryQueryToolChest toolChest = new SelectQueryQueryToolChest( + new DefaultObjectMapper(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + @Parameterized.Parameters(name = "{0}:descending={1}") public static Iterable constructorFeeder() throws IOException { return QueryRunnerTestHelper.cartesian( QueryRunnerTestHelper.makeQueryRunners( new SelectQueryRunnerFactory( - new SelectQueryQueryToolChest( - new DefaultObjectMapper(), - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() - ), + toolChest, new SelectQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) @@ -455,6 +457,61 @@ public class SelectQueryRunnerTest } } + @Test + public void testSelectWithFilterLookupExtractionFn () { + + Map extractionMap = new HashMap<>(); + extractionMap.put("total_market","replaced"); + MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap, false); + LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, true); + SelectQuery query = newTestQuery() + .intervals(I_0112_0114) + .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "replaced", lookupExtractionFn)) + .granularity(QueryRunnerTestHelper.dayGran) + .dimensionSpecs(DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.qualityDimension)) + .metrics(Lists.newArrayList(QueryRunnerTestHelper.indexMetric)) + .build(); + + Iterable> results = Sequences.toList( + runner.run(query, Maps.newHashMap()), + Lists.>newArrayList() + ); + Iterable> resultsOptimize = Sequences.toList( + toolChest.postMergeQueryDecoration(toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner))). + run(query, Maps.newHashMap()), Lists.>newArrayList() + ); + + final List>> events = toEvents( + new String[]{ + EventHolder.timestampKey + ":TIME", + null, + QueryRunnerTestHelper.qualityDimension + ":STRING", + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + // filtered values with day granularity + new String[]{ + "2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000", + "2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000" + }, + new String[]{ + "2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505", + "2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875" + } + ); + + PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); + List> expectedResults = toExpected( + events, + offset.startOffset(), + offset.threshold() + ); + + verify(expectedResults, results); + verify(expectedResults, resultsOptimize); + } + @Test public void testFullSelectNoResults() { diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java index d57436de47f..e70f17bf67d 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -37,6 +37,7 @@ import io.druid.query.aggregation.DoubleMinAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.PostAggregator; +import io.druid.query.extraction.MapLookupExtractor; import io.druid.query.filter.AndDimFilter; import io.druid.query.filter.BoundDimFilter; import io.druid.query.filter.DimFilter; @@ -44,6 +45,7 @@ import io.druid.query.filter.InDimFilter; import io.druid.query.filter.NotDimFilter; import io.druid.query.filter.RegexDimFilter; import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.lookup.LookupExtractionFn; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.TestHelper; import org.joda.time.DateTime; @@ -2266,4 +2268,73 @@ public class TimeseriesQueryRunnerTest ); TestHelper.assertExpectedResults(expectedResults, results); } + + @Test + public void testTimeSeriesWithSelectionFilterLookupExtractionFn() + { + Map extractionMap = new HashMap<>(); + extractionMap.put("spot","upfront"); + + MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap, false); + LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, true, null, true, true); + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.dayGran) + .filters( + new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "upfront", lookupExtractionFn) + ) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + QueryRunnerTestHelper.indexLongSum, + QueryRunnerTestHelper.qualityUniques + ) + ) + .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", 11L, + "index", 3783L, + "addRowsIndexConstant", 3795.0, + "uniques", QueryRunnerTestHelper.UNIQUES_9 + ) + ) + ), + new Result<>( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", 11L, + "index", 3313L, + "addRowsIndexConstant", 3325.0, + "uniques", QueryRunnerTestHelper.UNIQUES_9 + ) + ) + ) + ); + + Iterable> results = Sequences.toList( + runner.run(query, CONTEXT), + Lists.>newArrayList() + ); + TestHelper.assertExpectedResults(expectedResults, results); + + TimeseriesQueryQueryToolChest toolChest = new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + QueryRunner> optimizedRunner = toolChest.postMergeQueryDecoration( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner))); + Iterable> results2 = Sequences.toList( + optimizedRunner.run(query, CONTEXT), + Lists.>newArrayList() + ); + TestHelper.assertExpectedResults(expectedResults, results2); + + } }