Implement force push down for nested group by query (#5471)

* Force nested query push down

* Code review changes
This commit is contained in:
Samarth Jain 2018-10-22 13:43:47 -07:00 committed by Jihoon Son
parent 1e82b6291e
commit 359576a80b
15 changed files with 1737 additions and 67 deletions

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITNestedQueryPushDownTest extends AbstractIndexerTest
{
private static final String WIKITICKER_DATA_SOURCE = "wikiticker";
private static final String WIKITICKER_INDEX_TASK = "/indexer/wikiticker_index_task.json";
private static final String WIKITICKER_QUERIES_RESOURCE = "/queries/nestedquerypushdown_queries.json";
@Inject
private CoordinatorResourceTestClient coordinatorClient;
@Inject
private TestQueryHelper queryHelper;
private static final Logger LOG = new Logger(ITNestedQueryPushDownTest.class);
@Inject
private IntegrationTestingConfig config;
@Inject
ClientInfoResourceTestClient clientInfoResourceTestClient;
@Test
public void testIndexData()
{
try {
loadData();
queryHelper.testQueriesFromFile(WIKITICKER_QUERIES_RESOURCE, 2);
}
catch (Exception e) {
LOG.error(e, "Error while testing");
throw Throwables.propagate(e);
}
}
private void loadData() throws Exception
{
final String taskID = indexer.submitTask(getTaskAsString(WIKITICKER_INDEX_TASK));
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
RetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(WIKITICKER_DATA_SOURCE), "Segment Load"
);
}
}

View File

@ -0,0 +1,84 @@
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "wikiticker",
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "day",
"queryGranularity": "none",
"intervals": [
"2015-09-12/2015-09-13"
]
},
"parser": {
"type": "hadoopyString",
"parseSpec": {
"format": "json",
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user"
]
},
"timestampSpec": {
"format": "auto",
"column": "time"
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "added",
"type": "longSum",
"fieldName": "added"
},
{
"name": "deleted",
"type": "longSum",
"fieldName": "deleted"
},
{
"name": "delta",
"type": "longSum",
"fieldName": "delta"
},
{
"name": "user_unique",
"type": "hyperUnique",
"fieldName": "user"
}
]
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "local",
"baseDir": "/examples/quickstart/tutorial",
"filter": "wikiticker-2015-09-12-sampled.json.gz"
}
},
"tuningConfig": {
"type": "index",
"targetPartitionSize" : 10000
}
}
}

View File

@ -0,0 +1,303 @@
[
{
"description": "Nested group by double agg query with force push down",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "groupBy",
"dataSource": "wikiticker",
"intervals": [
"2015-09-12/2015-09-13"
],
"granularity": "all",
"dimensions": [
"channel",
"user"
],
"metric": "added",
"aggregations": [
{
"type": "longSum",
"name": "sumAdded",
"fieldName": "added"
}
]
}
},
"granularity": "all",
"dimension": "channel",
"aggregations": [
{
"type": "longSum",
"name": "groupedSumAdded",
"fieldName": "sumAdded"
}
],
"intervals": [
"2015-09-12/2015-09-13"
],
"context": {
"forcePushDownNestedQuery":"true"
}
},
"expectedResults": [
{
"version" : "v1",
"timestamp" : "2015-09-12T00:00:00.000Z",
"event" : {
"groupedSumAdded" : 9385573
}
}
]
},
{
"description": "Nested group by query with force push down and renamed dimensions",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "groupBy",
"dataSource": "wikiticker",
"intervals": [
"2015-09-12/2015-09-13"
],
"granularity": "all",
"dimensions": [
{"dimension" : "channel", "outputName" :"renamedChannel"},
{"dimension" : "user", "outputName" :"renamedUser"}
],
"metric": "added",
"aggregations": [
{
"type": "longSum",
"name": "sumAdded",
"fieldName": "added"
}
]
}
},
"granularity": "all",
"dimension": "renamedChannel",
"aggregations": [
{
"type": "longSum",
"name": "groupedSumAdded",
"fieldName": "sumAdded"
}
],
"intervals": [
"2015-09-12/2015-09-13"
],
"context": {
"forcePushDownNestedQuery":"true"
}
},
"expectedResults": [
{
"version" : "v1",
"timestamp" : "2015-09-12T00:00:00.000Z",
"event" : {
"groupedSumAdded" : 9385573
}
}
]
},
{
"description": "Nested group by query with force push down and filter on outer and inner query",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "groupBy",
"dataSource": "wikiticker",
"intervals": [
"2015-09-12/2015-09-13"
],
"granularity": "all",
"dimensions": [
{"dimension" : "channel", "outputName" :"renamedChannel"},
{"dimension" : "user", "outputName" :"renamedUser"}
],
"metric": "added",
"aggregations": [
{
"type": "longSum",
"name": "sumAdded",
"fieldName": "added"
}
],
"filter": {
"type": "or",
"fields": [
{
"type": "selector",
"dimension": "channel",
"value": "#zh.wikipedia"
},
{
"type": "selector",
"dimension": "channel",
"value": "#es.wikipedia"
}
]
}
}
},
"granularity": "all",
"dimension": "renamedChannel",
"aggregations": [
{
"type": "longSum",
"name": "groupedSumAdded",
"fieldName": "sumAdded"
}
],
"intervals": [
"2015-09-12/2015-09-13"
],
"filter": {
"type": "and",
"fields": [
{
"type": "selector",
"dimension": "renamedChannel",
"value": "#zh.wikipedia"
}
]
},
"context": {
"forcePushDownNestedQuery":"true"
}
},
"expectedResults": [
{
"version" : "v1",
"timestamp" : "2015-09-12T00:00:00.000Z",
"event" : {
"groupedSumAdded" : 191033
}
}
]
},
{
"description": "Nested group by query with force push down and having clause",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "groupBy",
"dataSource": "wikiticker",
"intervals": [
"2015-09-12/2015-09-13"
],
"granularity": "all",
"dimensions": [
{"dimension" : "channel"},
{"dimension" : "user"}
],
"metric": "added",
"aggregations": [
{
"type": "longSum",
"name": "sumAdded",
"fieldName": "added"
}
]
}
},
"granularity": "all",
"aggregations": [
{
"type": "longSum",
"name": "outerSum",
"fieldName": "sumAdded"
}
],
"intervals": [
"2015-09-12/2015-09-13"
],
"having": {
"type": "or",
"havingSpecs": [
{
"type": "greaterThan",
"aggregation": "outerSum",
"value": 9385570
}
]
},
"context": {
"forcePushDownNestedQuery":"true"
}
},
"expectedResults": [
{
"version" : "v1",
"timestamp" : "2015-09-12T00:00:00.000Z",
"event" : {
"outerSum" : 9385573
}
}
]
},
{
"description": "Nested group by query with force push down and having clause. This test asserts that the post processing was invoked.",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "groupBy",
"dataSource": "wikiticker",
"intervals": [
"2015-09-12/2015-09-13"
],
"granularity": "all",
"dimensions": [
{"dimension" : "channel"},
{"dimension" : "user"}
],
"metric": "added",
"aggregations": [
{
"type": "longSum",
"name": "sumAdded",
"fieldName": "added"
}
]
}
},
"granularity": "all",
"aggregations": [
{
"type": "longSum",
"name": "outerSum",
"fieldName": "sumAdded"
}
],
"intervals": [
"2015-09-12/2015-09-13"
],
"having": {
"type": "or",
"havingSpecs": [
{
"type": "greaterThan",
"aggregation": "outerSum",
"value": 100000000
}
]
},
"context": {
"forcePushDownNestedQuery":"true"
}
},
"expectedResults": [
]
}
]

View File

@ -114,4 +114,14 @@ public interface Query<T>
{ {
return this; return this;
} }
default List<Interval> getIntervalsOfInnerMostQuery()
{
if (getDataSource() instanceof QueryDataSource) {
//noinspection unchecked
return ((QueryDataSource) getDataSource()).getQuery().getIntervalsOfInnerMostQuery();
} else {
return getIntervals();
}
}
} }

View File

@ -20,6 +20,8 @@
package org.apache.druid.query; package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import java.util.Map; import java.util.Map;
@ -39,10 +41,29 @@ public class SubqueryQueryRunner<T> implements QueryRunner<T>
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext) public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{ {
DataSource dataSource = queryPlus.getQuery().getDataSource(); DataSource dataSource = queryPlus.getQuery().getDataSource();
if (dataSource instanceof QueryDataSource) { boolean forcePushDownNestedQuery = queryPlus.getQuery()
.getContextBoolean(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY,
false
);
if (dataSource instanceof QueryDataSource && !forcePushDownNestedQuery) {
return run(queryPlus.withQuery((Query<T>) ((QueryDataSource) dataSource).getQuery()), responseContext); return run(queryPlus.withQuery((Query<T>) ((QueryDataSource) dataSource).getQuery()), responseContext);
} else { } else {
return baseRunner.run(queryPlus, responseContext); QueryPlus newQuery = queryPlus;
if (forcePushDownNestedQuery) {
// Disable any more push downs before firing off the query. But do let the historical know
// that it is executing the complete nested query and not just the inner most part of it
newQuery = queryPlus.withQuery(
queryPlus.getQuery()
.withOverriddenContext(
ImmutableMap.of(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false,
GroupByQueryConfig.CTX_KEY_EXECUTING_NESTED_QUERY, true
)
)
);
}
return baseRunner.run(newQuery, responseContext);
} }
} }
} }

View File

@ -29,6 +29,8 @@ public class GroupByQueryConfig
public static final String CTX_KEY_STRATEGY = "groupByStrategy"; public static final String CTX_KEY_STRATEGY = "groupByStrategy";
public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown"; public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown";
public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown"; public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown";
public static final String CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY = "forcePushDownNestedQuery";
public static final String CTX_KEY_EXECUTING_NESTED_QUERY = "executingNestedQuery";
private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded"; private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded";
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private static final String CTX_KEY_MAX_RESULTS = "maxResults"; private static final String CTX_KEY_MAX_RESULTS = "maxResults";
@ -74,6 +76,9 @@ public class GroupByQueryConfig
@JsonProperty @JsonProperty
private boolean forcePushDownLimit = false; private boolean forcePushDownLimit = false;
@JsonProperty
private boolean forcePushDownNestedQuery = false;
@JsonProperty @JsonProperty
private boolean forceHashAggregation = false; private boolean forceHashAggregation = false;
@ -163,6 +168,11 @@ public class GroupByQueryConfig
return numParallelCombineThreads; return numParallelCombineThreads;
} }
public boolean isForcePushDownNestedQuery()
{
return forcePushDownNestedQuery;
}
public GroupByQueryConfig withOverrides(final GroupByQuery query) public GroupByQueryConfig withOverrides(final GroupByQuery query)
{ {
final GroupByQueryConfig newConfig = new GroupByQueryConfig(); final GroupByQueryConfig newConfig = new GroupByQueryConfig();
@ -198,6 +208,7 @@ public class GroupByQueryConfig
); );
newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit()); newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation()); newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation());
newConfig.forcePushDownNestedQuery = query.getContextBoolean(CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, isForcePushDownNestedQuery());
newConfig.intermediateCombineDegree = query.getContextValue( newConfig.intermediateCombineDegree = query.getContextValue(
CTX_KEY_INTERMEDIATE_COMBINE_DEGREE, CTX_KEY_INTERMEDIATE_COMBINE_DEGREE,
getIntermediateCombineDegree() getIntermediateCombineDegree()
@ -226,6 +237,7 @@ public class GroupByQueryConfig
", forceHashAggregation=" + forceHashAggregation + ", forceHashAggregation=" + forceHashAggregation +
", intermediateCombineDegree=" + intermediateCombineDegree + ", intermediateCombineDegree=" + intermediateCombineDegree +
", numParallelCombineThreads=" + numParallelCombineThreads + ", numParallelCombineThreads=" + numParallelCombineThreads +
", forcePushDownNestedQuery=" + forcePushDownNestedQuery +
'}'; '}';
} }
} }

View File

@ -149,6 +149,20 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
QueryRunner<Row> runner, QueryRunner<Row> runner,
Map<String, Object> context Map<String, Object> context
) )
{
if (isNestedQueryPushDown(query, groupByStrategy)) {
return mergeResultsWithNestedQueryPushDown(groupByStrategy, query, resource, runner, context);
}
return mergeGroupByResultsWithoutPushDown(groupByStrategy, query, resource, runner, context);
}
private Sequence<Row> mergeGroupByResultsWithoutPushDown(
GroupByStrategy groupByStrategy,
GroupByQuery query,
GroupByQueryResource resource,
QueryRunner<Row> runner,
Map<String, Object> context
)
{ {
// If there's a subquery, merge subquery results and then apply the aggregator // If there's a subquery, merge subquery results and then apply the aggregator
@ -192,31 +206,21 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
context context
); );
final Sequence<Row> finalizingResults; final Sequence<Row> finalizingResults = finalizeSubqueryResults(subqueryResult, subquery);
if (QueryContexts.isFinalize(subquery, false)) {
finalizingResults = new MappedSequence<>(
subqueryResult,
makePreComputeManipulatorFn(
subquery,
MetricManipulatorFns.finalizing()
)::apply
);
} else {
finalizingResults = subqueryResult;
}
if (query.getSubtotalsSpec() != null) { if (query.getSubtotalsSpec() != null) {
return groupByStrategy.processSubtotalsSpec( return groupByStrategy.processSubtotalsSpec(
query, query,
resource, resource,
groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults) groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults, false)
); );
} else { } else {
return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult( return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult(
subquery, subquery,
query, query,
resource, resource,
finalizingResults finalizingResults,
false
), query); ), query);
} }
@ -233,6 +237,69 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
} }
} }
private Sequence<Row> mergeResultsWithNestedQueryPushDown(
GroupByStrategy groupByStrategy,
GroupByQuery query,
GroupByQueryResource resource,
QueryRunner<Row> runner,
Map<String, Object> context
)
{
Sequence<Row> pushDownQueryResults = groupByStrategy.mergeResults(runner, query, context);
final Sequence<Row> finalizedResults = finalizeSubqueryResults(pushDownQueryResults, query);
GroupByQuery rewrittenQuery = rewriteNestedQueryForPushDown(query);
return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult(
query,
rewrittenQuery,
resource,
finalizedResults,
true
), query);
}
/**
* Rewrite the aggregator and dimension specs since the push down nested query will return
* results with dimension and aggregation specs of the original nested query.
*/
@VisibleForTesting
GroupByQuery rewriteNestedQueryForPushDown(GroupByQuery query)
{
return query.withAggregatorSpecs(Lists.transform(query.getAggregatorSpecs(), (agg) -> agg.getCombiningFactory()))
.withDimensionSpecs(Lists.transform(
query.getDimensions(),
(dim) -> new DefaultDimensionSpec(
dim.getOutputName(),
dim.getOutputName(),
dim.getOutputType()
)
));
}
private Sequence<Row> finalizeSubqueryResults(Sequence<Row> subqueryResult, GroupByQuery subquery)
{
final Sequence<Row> finalizingResults;
if (QueryContexts.isFinalize(subquery, false)) {
finalizingResults = new MappedSequence<>(
subqueryResult,
makePreComputeManipulatorFn(
subquery,
MetricManipulatorFns.finalizing()
)::apply
);
} else {
finalizingResults = subqueryResult;
}
return finalizingResults;
}
public static boolean isNestedQueryPushDown(GroupByQuery q, GroupByStrategy strategy)
{
return q.getDataSource() instanceof QueryDataSource
&& q.getContextBoolean(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false)
&& q.getSubtotalsSpec() == null
&& strategy.supportsNestedQueryPushDown();
}
@Override @Override
public GroupByQueryMetrics makeMetrics(GroupByQuery query) public GroupByQueryMetrics makeMetrics(GroupByQuery query)
{ {

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.groupby; package org.apache.druid.query.groupby;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -108,4 +109,10 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter); return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter);
} }
} }
@VisibleForTesting
public GroupByStrategySelector getStrategySelector()
{
return strategySelector;
}
} }

View File

@ -20,7 +20,6 @@
package org.apache.druid.query.groupby.epinephelinae; package org.apache.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.guava.SettableSupplier; import org.apache.druid.common.guava.SettableSupplier;
@ -65,7 +64,8 @@ public class GroupByRowProcessor
final ObjectMapper spillMapper, final ObjectMapper spillMapper,
final String processingTmpDir, final String processingTmpDir,
final int mergeBufferSize, final int mergeBufferSize,
final List<Closeable> closeOnExit final List<Closeable> closeOnExit,
final boolean wasQueryPushedDown
) )
{ {
final GroupByQuery query = (GroupByQuery) queryParam; final GroupByQuery query = (GroupByQuery) queryParam;
@ -76,50 +76,16 @@ public class GroupByRowProcessor
aggregatorFactories[i] = query.getAggregatorSpecs().get(i); aggregatorFactories[i] = query.getAggregatorSpecs().get(i);
} }
final File temporaryStorageDirectory = new File( final File temporaryStorageDirectory = new File(
processingTmpDir, processingTmpDir,
StringUtils.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) StringUtils.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
); );
final List<Interval> queryIntervals = query.getIntervals(); Sequence<Row> sequenceToGroup = rows;
final Filter filter = Filters.convertToCNFFromQueryContext( // When query is pushed down, rows have already been filtered
query, if (!wasQueryPushedDown) {
Filters.toFilter(query.getDimFilter()) sequenceToGroup = getFilteredSequence(rows, rowSignature, query);
); }
final SettableSupplier<Row> rowSupplier = new SettableSupplier<>();
final RowBasedColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create(
rowSupplier,
rowSignature
);
final ValueMatcher filterMatcher = filter == null
? BooleanValueMatcher.of(true)
: filter.makeMatcher(columnSelectorFactory);
final FilteredSequence<Row> filteredSequence = new FilteredSequence<>(
rows,
new Predicate<Row>()
{
@Override
public boolean apply(Row input)
{
boolean inInterval = false;
DateTime rowTime = input.getTimestamp();
for (Interval queryInterval : queryIntervals) {
if (queryInterval.contains(rowTime)) {
inInterval = true;
break;
}
}
if (!inInterval) {
return false;
}
rowSupplier.set(input);
return filterMatcher.matches();
}
}
);
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
temporaryStorageDirectory, temporaryStorageDirectory,
@ -152,7 +118,8 @@ public class GroupByRowProcessor
final Accumulator<AggregateResult, Row> accumulator = pair.rhs; final Accumulator<AggregateResult, Row> accumulator = pair.rhs;
closeOnExit.add(grouper); closeOnExit.add(grouper);
final AggregateResult retVal = filteredSequence.accumulate(AggregateResult.ok(), accumulator); final AggregateResult retVal = sequenceToGroup.accumulate(AggregateResult.ok(), accumulator);
if (!retVal.isOk()) { if (!retVal.isOk()) {
throw new ResourceLimitExceededException(retVal.getReason()); throw new ResourceLimitExceededException(retVal.getReason());
} }
@ -160,6 +127,47 @@ public class GroupByRowProcessor
return grouper; return grouper;
} }
private static Sequence<Row> getFilteredSequence(
Sequence<Row> rows,
Map<String, ValueType> rowSignature,
GroupByQuery query
)
{
final List<Interval> queryIntervals = query.getIntervals();
final Filter filter = Filters.convertToCNFFromQueryContext(
query,
Filters.toFilter(query.getDimFilter())
);
final SettableSupplier<Row> rowSupplier = new SettableSupplier<>();
final RowBasedColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create(
rowSupplier,
rowSignature
);
final ValueMatcher filterMatcher = filter == null
? BooleanValueMatcher.of(true)
: filter.makeMatcher(columnSelectorFactory);
return new FilteredSequence<>(
rows,
input -> {
boolean inInterval = false;
DateTime rowTime = input.getTimestamp();
for (Interval queryInterval : queryIntervals) {
if (queryInterval.contains(rowTime)) {
inInterval = true;
break;
}
}
if (!inInterval) {
return false;
}
rowSupplier.set(input);
return filterMatcher.matches();
}
);
}
public static Sequence<Row> getRowsFromGrouper(GroupByQuery query, List<String> subtotalSpec, Supplier<Grouper> grouper) public static Sequence<Row> getRowsFromGrouper(GroupByQuery query, List<String> subtotalSpec, Supplier<Grouper> grouper)
{ {
return new BaseSequence<>( return new BaseSequence<>(
@ -183,5 +191,6 @@ public class GroupByRowProcessor
} }
} }
); );
} }
} }

View File

@ -82,7 +82,8 @@ public interface GroupByStrategy
GroupByQuery subquery, GroupByQuery subquery,
GroupByQuery query, GroupByQuery query,
GroupByQueryResource resource, GroupByQueryResource resource,
Sequence<Row> subqueryResult Sequence<Row> subqueryResult,
boolean wasQueryPushedDown
); );
Sequence<Row> processSubtotalsSpec( Sequence<Row> processSubtotalsSpec(
@ -100,4 +101,6 @@ public interface GroupByStrategy
GroupByQuery query, GroupByQuery query,
StorageAdapter storageAdapter StorageAdapter storageAdapter
); );
boolean supportsNestedQueryPushDown();
} }

View File

@ -159,7 +159,11 @@ public class GroupByStrategyV1 implements GroupByStrategy
@Override @Override
public Sequence<Row> processSubqueryResult( public Sequence<Row> processSubqueryResult(
GroupByQuery subquery, GroupByQuery query, GroupByQueryResource resource, Sequence<Row> subqueryResult GroupByQuery subquery,
GroupByQuery query,
GroupByQueryResource resource,
Sequence<Row> subqueryResult,
boolean wasQueryPushedDown
) )
{ {
final Set<AggregatorFactory> aggs = Sets.newHashSet(); final Set<AggregatorFactory> aggs = Sets.newHashSet();
@ -287,4 +291,10 @@ public class GroupByStrategyV1 implements GroupByStrategy
{ {
return engine.process(query, storageAdapter); return engine.process(query, storageAdapter);
} }
@Override
public boolean supportsNestedQueryPushDown()
{
return false;
}
} }

View File

@ -277,6 +277,16 @@ public class GroupByStrategyV2 implements GroupByStrategy
@Override @Override
public Row apply(final Row row) public Row apply(final Row row)
{ {
if (query.getContextBoolean(GroupByQueryConfig.CTX_KEY_EXECUTING_NESTED_QUERY, false)) {
// When executing nested queries, we need to make sure that we are
// extracting out the event from the row. Post aggregators are not invoked since
// they should only be used after merging all the nested query responses. Timestamp
// if it needs to be fudged, it is ok to do here.
return new MapBasedRow(
fudgeTimestamp != null ? fudgeTimestamp : row.getTimestamp(),
((MapBasedRow) row).getEvent()
);
}
// Apply postAggregators and fudgeTimestamp if present and if this is the outermost mergeResults. // Apply postAggregators and fudgeTimestamp if present and if this is the outermost mergeResults.
if (!query.getContextBoolean(CTX_KEY_OUTERMOST, true)) { if (!query.getContextBoolean(CTX_KEY_OUTERMOST, true)) {
@ -323,7 +333,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
GroupByQuery subquery, GroupByQuery subquery,
GroupByQuery query, GroupByQuery query,
GroupByQueryResource resource, GroupByQueryResource resource,
Sequence<Row> subqueryResult Sequence<Row> subqueryResult,
boolean wasQueryPushedDown
) )
{ {
// This contains all closeable objects which are closed when the returned iterator iterates all the elements, // This contains all closeable objects which are closed when the returned iterator iterates all the elements,
@ -335,13 +346,14 @@ public class GroupByStrategyV2 implements GroupByStrategy
() -> GroupByRowProcessor.createGrouper( () -> GroupByRowProcessor.createGrouper(
query, query,
subqueryResult, subqueryResult,
GroupByQueryHelper.rowSignatureFor(subquery), GroupByQueryHelper.rowSignatureFor(wasQueryPushedDown ? query : subquery),
configSupplier.get(), configSupplier.get(),
resource, resource,
spillMapper, spillMapper,
processingConfig.getTmpDir(), processingConfig.getTmpDir(),
processingConfig.intermediateComputeSizeBytes(), processingConfig.intermediateComputeSizeBytes(),
closeOnExit closeOnExit,
wasQueryPushedDown
) )
); );
@ -403,7 +415,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
spillMapper, spillMapper,
processingConfig.getTmpDir(), processingConfig.getTmpDir(),
processingConfig.intermediateComputeSizeBytes(), processingConfig.intermediateComputeSizeBytes(),
closeOnExit closeOnExit,
false
) )
); );
List<Sequence<Row>> subtotalsResults = new ArrayList<>(subtotals.size()); List<Sequence<Row>> subtotalsResults = new ArrayList<>(subtotals.size());
@ -476,4 +489,10 @@ public class GroupByStrategyV2 implements GroupByStrategy
{ {
return GroupByQueryEngineV2.process(query, storageAdapter, bufferPool, configSupplier.get().withOverrides(query)); return GroupByQueryEngineV2.process(query, storageAdapter, bufferPool, configSupplier.get().withOverrides(query));
} }
@Override
public boolean supportsNestedQueryPushDown()
{
return true;
}
} }

View File

@ -217,6 +217,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
private final int uncoveredIntervalsLimit; private final int uncoveredIntervalsLimit;
private final Query<T> downstreamQuery; private final Query<T> downstreamQuery;
private final Map<String, Cache.NamedKey> cachePopulatorKeyMap = Maps.newHashMap(); private final Map<String, Cache.NamedKey> cachePopulatorKeyMap = Maps.newHashMap();
private final List<Interval> intervals;
SpecificQueryRunnable(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext) SpecificQueryRunnable(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{ {
@ -233,6 +234,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
// and might blow up in some cases https://github.com/apache/incubator-druid/issues/2108 // and might blow up in some cases https://github.com/apache/incubator-druid/issues/2108
this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query);
this.downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext()); this.downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext());
// For nested queries, we need to look at the intervals of the inner most query.
this.intervals = query.getIntervalsOfInnerMostQuery();
} }
private ImmutableMap<String, Object> makeDownstreamQueryContext() private ImmutableMap<String, Object> makeDownstreamQueryContext()
@ -291,7 +294,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
{ {
final List<TimelineObjectHolder<String, ServerSelector>> serversLookup = toolChest.filterSegments( final List<TimelineObjectHolder<String, ServerSelector>> serversLookup = toolChest.filterSegments(
query, query,
query.getIntervals().stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList()) intervals.stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList())
); );
final Set<ServerToSegment> segments = Sets.newLinkedHashSet(); final Set<ServerToSegment> segments = Sets.newLinkedHashSet();
@ -322,7 +325,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
final List<Interval> uncoveredIntervals = new ArrayList<>(uncoveredIntervalsLimit); final List<Interval> uncoveredIntervals = new ArrayList<>(uncoveredIntervalsLimit);
boolean uncoveredIntervalsOverflowed = false; boolean uncoveredIntervalsOverflowed = false;
for (Interval interval : query.getIntervals()) { for (Interval interval : intervals) {
Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval); Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval);
long startMillis = interval.getStartMillis(); long startMillis = interval.getStartMillis();
long endMillis = interval.getEndMillis(); long endMillis = interval.getEndMillis();

View File

@ -42,6 +42,7 @@ import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.PerSegmentOptimizingQueryRunner; import org.apache.druid.query.PerSegmentOptimizingQueryRunner;
import org.apache.druid.query.PerSegmentQueryOptimizationContext; import org.apache.druid.query.PerSegmentQueryOptimizationContext;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactory;
@ -110,6 +111,14 @@ public class ServerManager implements QuerySegmentWalker
this.serverConfig = serverConfig; this.serverConfig = serverConfig;
} }
private DataSource getInnerMostDataSource(DataSource dataSource)
{
if (dataSource instanceof QueryDataSource) {
return getInnerMostDataSource(((QueryDataSource) dataSource).getQuery().getDataSource());
}
return dataSource;
}
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{ {
@ -121,7 +130,7 @@ public class ServerManager implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest(); final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
DataSource dataSource = query.getDataSource(); DataSource dataSource = getInnerMostDataSource(query.getDataSource());
if (!(dataSource instanceof TableDataSource)) { if (!(dataSource instanceof TableDataSource)) {
throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported");
} }