Optimize filter for timeseries, search, and select queries (#2931)

* Optimize filter for timeseries, search, and select queries

* exception at failed toolchest type check

* took out query type check

* java7 error fix and test improvement
This commit is contained in:
Dave Li 2016-05-09 14:04:06 -04:00 committed by Gian Merlino
parent 2cfd337378
commit 79a54283d4
12 changed files with 281 additions and 67 deletions

View File

@ -55,7 +55,7 @@ public interface QueryRunnerFactory<T, QueryType extends Query<T>>
*
* @param queryExecutor ExecutorService to be used for parallel processing
* @param queryRunners Individual QueryRunner objects that produce some results
* @return a QueryRunner that, when asked, will use the ExecutorService to runt he base QueryRunners
* @return a QueryRunner that, when asked, will use the ExecutorService to run the base QueryRunners
*/
public QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners);

View File

@ -424,9 +424,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
{
if (!(query instanceof GroupByQuery)) {
return runner.run(query, responseContext);
}
GroupByQuery groupByQuery = (GroupByQuery) query;
if (groupByQuery.getDimFilter() != null){
groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize());

View File

@ -241,10 +241,24 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
@Override
public QueryRunner<Result<SearchResultValue>> preMergeQueryDecoration(QueryRunner<Result<SearchResultValue>> runner)
public QueryRunner<Result<SearchResultValue>> preMergeQueryDecoration(final QueryRunner<Result<SearchResultValue>> runner)
{
return new SearchThresholdAdjustingQueryRunner(
intervalChunkingQueryRunnerDecorator.decorate(runner, this),
intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Result<SearchResultValue>>()
{
@Override
public Sequence<Result<SearchResultValue>> run(
Query<Result<SearchResultValue>> query, Map<String, Object> responseContext
)
{
SearchQuery searchQuery = (SearchQuery) query;
if (searchQuery.getDimensionsFilter() != null) {
searchQuery = searchQuery.withDimFilter(searchQuery.getDimensionsFilter().optimize());
}
return runner.run(searchQuery, responseContext);
}
} , this),
config
);
}

View File

@ -131,6 +131,21 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
);
}
public SearchQuery withDimFilter(DimFilter dimFilter)
{
return new SearchQuery(
getDataSource(),
dimFilter,
granularity,
limit,
getQuerySegmentSpec(),
dimensions,
querySpec,
sortSpec,
getContext()
);
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{

View File

@ -188,6 +188,21 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
);
}
public SelectQuery withDimFilter(DimFilter dimFilter)
{
return new SelectQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
dimensions,
metrics,
pagingSpec,
getContext()
);
}
@Override
public String toString()
{

View File

@ -29,8 +29,10 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.granularity.QueryGranularity;
@ -261,9 +263,23 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
@Override
public QueryRunner<Result<SelectResultValue>> preMergeQueryDecoration(QueryRunner<Result<SelectResultValue>> runner)
public QueryRunner<Result<SelectResultValue>> preMergeQueryDecoration(final QueryRunner<Result<SelectResultValue>> runner)
{
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
return intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Result<SelectResultValue>>()
{
@Override
public Sequence<Result<SelectResultValue>> run(
Query<Result<SelectResultValue>> query, Map<String, Object> responseContext
)
{
SelectQuery selectQuery = (SelectQuery) query;
if (selectQuery.getDimensionsFilter() != null) {
selectQuery = selectQuery.withDimFilter(selectQuery.getDimensionsFilter().optimize());
}
return runner.run(selectQuery, responseContext);
}
}, this);
}
@Override

View File

@ -152,6 +152,20 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
);
}
public TimeseriesQuery withDimFilter(DimFilter dimFilter)
{
return new TimeseriesQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
}
@Override
public String toString()
{

View File

@ -25,6 +25,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.granularity.QueryGranularity;
@ -210,9 +212,23 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
@Override
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(QueryRunner<Result<TimeseriesResultValue>> runner)
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(final QueryRunner<Result<TimeseriesResultValue>> runner)
{
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
return intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query, Map<String, Object> responseContext
)
{
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) query;
if (timeseriesQuery.getDimensionsFilter() != null) {
timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize());
}
return runner.run(timeseriesQuery, responseContext);
}
}, this);
}
@Override

View File

@ -418,27 +418,23 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
Query<Result<TopNResultValue>> query, Map<String, Object> responseContext
)
{
if (!(query instanceof TopNQuery)) {
return runner.run(query, responseContext);
TopNQuery topNQuery = (TopNQuery) query;
if (topNQuery.getDimensionsFilter() != null) {
topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize());
}
final TopNQuery delegateTopNQuery = topNQuery;
if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) {
final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec();
return runner.run(
delegateTopNQuery.withDimensionSpec(
new DefaultDimensionSpec(
dimensionSpec.getDimension(),
dimensionSpec.getOutputName()
)
), responseContext
);
} else {
TopNQuery topNQuery = (TopNQuery) query;
if (topNQuery.getDimensionsFilter() != null) {
topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize());
}
final TopNQuery delegateTopNQuery = topNQuery;
if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) {
final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec();
return runner.run(
delegateTopNQuery.withDimensionSpec(
new DefaultDimensionSpec(
dimensionSpec.getDimension(),
dimensionSpec.getOutputName()
)
), responseContext
);
} else {
return runner.run(delegateTopNQuery, responseContext);
}
return runner.run(delegateTopNQuery, responseContext);
}
}
}

View File

@ -82,12 +82,15 @@ public class SearchQueryRunnerTest
}
private final QueryRunner runner;
private final QueryRunner decoratedRunner;
public SearchQueryRunnerTest(
QueryRunner runner
)
{
this.runner = runner;
this.decoratedRunner = toolChest.postMergeQueryDecoration(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)));
}
@Test
@ -342,34 +345,33 @@ public class SearchQueryRunnerTest
true,
null,
true,
false
true
);
checkSearchQuery(
Druids.newSearchQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.filters(
new ExtractionDimFilter(
QueryRunnerTestHelper.qualityDimension,
automotiveSnowman,
lookupExtractionFn,
null
)
)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.dimensions(
new ExtractionDimensionSpec(
QueryRunnerTestHelper.qualityDimension,
null,
lookupExtractionFn,
null
)
)
.query("")
.build(),
expectedHits
);
SearchQuery query = Druids.newSearchQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.filters(
new ExtractionDimFilter(
QueryRunnerTestHelper.qualityDimension,
automotiveSnowman,
lookupExtractionFn,
null
)
)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.dimensions(
new ExtractionDimensionSpec(
QueryRunnerTestHelper.qualityDimension,
null,
lookupExtractionFn,
null
)
)
.query("")
.build();
checkSearchQuery(query, expectedHits);
}
@Test
@ -551,6 +553,7 @@ public class SearchQueryRunnerTest
private void checkSearchQuery(Query searchQuery, List<SearchHit> expectedResults)
{
checkSearchQuery(searchQuery, runner, expectedResults);
checkSearchQuery(searchQuery, decoratedRunner, expectedResults);
}
private void checkSearchQuery(Query searchQuery, QueryRunner runner, List<SearchHit> expectedResults)
@ -559,31 +562,31 @@ public class SearchQueryRunnerTest
runner.run(searchQuery, ImmutableMap.of()),
Lists.<Result<SearchResultValue>>newArrayList()
);
List<SearchHit> copy = ImmutableList.copyOf(expectedResults);
List<SearchHit> copy = Lists.newLinkedList(expectedResults);
for (Result<SearchResultValue> result : results) {
Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), result.getTimestamp());
Assert.assertTrue(result.getValue() instanceof Iterable);
Iterable<SearchHit> resultValues = result.getValue();
for (SearchHit resultValue : resultValues) {
int index = expectedResults.indexOf(resultValue);
int index = copy.indexOf(resultValue);
if (index < 0) {
fail(
copy, results,
expectedResults, results,
"No result found containing " + resultValue.getDimension() + " and " + resultValue.getValue()
);
}
SearchHit expected = expectedResults.remove(index);
SearchHit expected = copy.remove(index);
if (!resultValue.toString().equals(expected.toString())) {
fail(
copy, results,
expectedResults, results,
"Invalid count for " + resultValue + ".. which was expected to be " + expected.getCount()
);
}
}
}
if (!expectedResults.isEmpty()) {
fail(copy, results, "Some expected results are not shown: " + expectedResults);
if (!copy.isEmpty()) {
fail(expectedResults, results, "Some expected results are not shown: " + copy);
}
}

View File

@ -99,16 +99,18 @@ public class SelectQueryRunnerTest
);
public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class);
private static final SelectQueryQueryToolChest toolChest = new SelectQueryQueryToolChest(
new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
@Parameterized.Parameters(name = "{0}:descending={1}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.cartesian(
QueryRunnerTestHelper.makeQueryRunners(
new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(
new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
toolChest,
new SelectQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
@ -455,6 +457,61 @@ public class SelectQueryRunnerTest
}
}
@Test
public void testSelectWithFilterLookupExtractionFn () {
Map<String, String> extractionMap = new HashMap<>();
extractionMap.put("total_market","replaced");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap, false);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, true);
SelectQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "replaced", lookupExtractionFn))
.granularity(QueryRunnerTestHelper.dayGran)
.dimensionSpecs(DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.qualityDimension))
.metrics(Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric))
.build();
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, Maps.newHashMap()),
Lists.<Result<SelectResultValue>>newArrayList()
);
Iterable<Result<SelectResultValue>> resultsOptimize = Sequences.toList(
toolChest.postMergeQueryDecoration(toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner))).
run(query, Maps.<String, Object>newHashMap()), Lists.<Result<SelectResultValue>>newArrayList()
);
final List<List<Map<String, Object>>> events = toEvents(
new String[]{
EventHolder.timestampKey + ":TIME",
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":FLOAT"
},
// filtered values with day granularity
new String[]{
"2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000",
"2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000"
},
new String[]{
"2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505",
"2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875"
}
);
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
events,
offset.startOffset(),
offset.threshold()
);
verify(expectedResults, results);
verify(expectedResults, resultsOptimize);
}
@Test
public void testFullSelectNoResults()
{

View File

@ -37,6 +37,7 @@ import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.filter.DimFilter;
@ -44,6 +45,7 @@ import io.druid.query.filter.InDimFilter;
import io.druid.query.filter.NotDimFilter;
import io.druid.query.filter.RegexDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.lookup.LookupExtractionFn;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.TestHelper;
import org.joda.time.DateTime;
@ -2266,4 +2268,73 @@ public class TimeseriesQueryRunnerTest
);
TestHelper.assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeSeriesWithSelectionFilterLookupExtractionFn()
{
Map<String, String> extractionMap = new HashMap<>();
extractionMap.put("spot","upfront");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap, false);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, true, null, true, true);
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.dayGran)
.filters(
new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "upfront", lookupExtractionFn)
)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows", 11L,
"index", 3783L,
"addRowsIndexConstant", 3795.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
),
new Result<>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows", 11L,
"index", 3313L,
"addRowsIndexConstant", 3325.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
)
);
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
TimeseriesQueryQueryToolChest toolChest = new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
QueryRunner<Result<TimeseriesResultValue>> optimizedRunner = toolChest.postMergeQueryDecoration(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)));
Iterable<Result<TimeseriesResultValue>> results2 = Sequences.toList(
optimizedRunner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results2);
}
}