Don't use limit push down with having spec (#4630)

* Don't use limit push down with having spec

* Throw exception when forcing limit push down with having

* Tests for having and limit push down

* Fix pool sizes in unit test
This commit is contained in:
Jonathan Wei 2017-08-04 15:13:29 -07:00 committed by Gian Merlino
parent 7a005088d9
commit 9650d80f3e
5 changed files with 502 additions and 1 deletions

View File

@ -317,6 +317,12 @@ public class GroupByQuery extends BaseQuery<Row>
return applyLimitPushDown;
}
@JsonIgnore
public boolean getApplyLimitPushDownFromContext()
{
return getContextBoolean(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true);
}
@Override
public Ordering getResultOrdering()
{
@ -346,6 +352,10 @@ public class GroupByQuery extends BaseQuery<Row>
throw new IAE("When forcing limit push down, the provided limit spec must have a limit.");
}
if (havingSpec != null) {
throw new IAE("Cannot force limit push down when a having spec is present.");
}
for (OrderByColumnSpec orderBySpec : ((DefaultLimitSpec) limitSpec).getColumns()) {
if (OrderByColumnSpec.getPostAggIndexForOrderBy(orderBySpec, postAggregatorSpecs) > -1) {
throw new UnsupportedOperationException("Limit push down when sorting by a post aggregator is not supported.");
@ -371,6 +381,14 @@ public class GroupByQuery extends BaseQuery<Row>
return true;
}
if (!getApplyLimitPushDownFromContext()) {
return false;
}
if (havingSpec != null) {
return false;
}
// If the sorting order only uses columns in the grouping key, we can always push the limit down
// to the buffer grouper without affecting result accuracy
boolean sortHasNonGroupingFields = DefaultLimitSpec.sortingOrderHasNonGroupingFields(

View File

@ -28,6 +28,7 @@ public class GroupByQueryConfig
{
public static final String CTX_KEY_STRATEGY = "groupByStrategy";
public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown";
public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown";
private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded";
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private static final String CTX_KEY_MAX_RESULTS = "maxResults";

View File

@ -246,7 +246,9 @@ public class GroupByStrategyV2 implements GroupByStrategy
"finalize", false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2,
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()),
CTX_KEY_OUTERMOST, false
CTX_KEY_OUTERMOST, false,
// the having spec shouldn't be passed down, so we need to convey the existing limit push down status
GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, query.isApplyLimitPushDown()
)
);

View File

@ -0,0 +1,434 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.having.GreaterThanHavingSpec;
import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.groupby.strategy.GroupByStrategyV1;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
public class GroupByMultiSegmentTest
{
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
private File tmpDir;
private QueryRunnerFactory<Row, GroupByQuery> groupByFactory;
private List<IncrementalIndex> incrementalIndices = Lists.newArrayList();
private List<QueryableIndex> groupByIndices = Lists.newArrayList();
private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
JSON_MAPPER.setInjectableValues(
new InjectableValues.Std().addValue(
ExprMacroTable.class,
ExprMacroTable.nil()
)
);
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
private IncrementalIndex makeIncIndex(boolean withRollup)
{
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dimA"),
new LongDimensionSchema("metA")
),
null,
null
))
.withRollup(withRollup)
.build()
)
.setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.buildOnheap();
}
@Before
public void setup() throws Exception
{
tmpDir = Files.createTempDir();
InputRow row;
List<String> dimNames = Arrays.asList("dimA", "metA");
Map<String, Object> event;
final IncrementalIndex indexA = makeIncIndex(false);
incrementalIndices.add(indexA);
event = new HashMap<>();
event.put("dimA", "hello");
event.put("metA", 100);
row = new MapBasedInputRow(1000, dimNames, event);
indexA.add(row);
event = new HashMap<>();
event.put("dimA", "world");
event.put("metA", 75);
row = new MapBasedInputRow(1000, dimNames, event);
indexA.add(row);
final File fileA = INDEX_MERGER_V9.persist(
indexA,
new File(tmpDir, "A"),
new IndexSpec()
);
QueryableIndex qindexA = INDEX_IO.loadIndex(fileA);
final IncrementalIndex indexB = makeIncIndex(false);
incrementalIndices.add(indexB);
event = new HashMap<>();
event.put("dimA", "foo");
event.put("metA", 100);
row = new MapBasedInputRow(1000, dimNames, event);
indexB.add(row);
event = new HashMap<>();
event.put("dimA", "world");
event.put("metA", 75);
row = new MapBasedInputRow(1000, dimNames, event);
indexB.add(row);
final File fileB = INDEX_MERGER_V9.persist(
indexB,
new File(tmpDir, "B"),
new IndexSpec()
);
QueryableIndex qindexB = INDEX_IO.loadIndex(fileB);
groupByIndices = Arrays.asList(qindexA, qindexB);
setupGroupByFactory();
}
private void setupGroupByFactory()
{
executorService = Execs.multiThreaded(2, "GroupByThreadPool[%d]");
NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 10_000_000),
0,
Integer.MAX_VALUE
);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
2
);
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return "v2";
}
@Override
public int getBufferGrouperInitialBuckets()
{
return -1;
}
@Override
public long getMaxOnDiskStorage()
{
return 1_000_000_000L;
}
};
config.setSingleThreaded(false);
config.setMaxIntermediateRows(Integer.MAX_VALUE);
config.setMaxResults(Integer.MAX_VALUE);
DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig()
{
@Override
public int getNumThreads()
{
// Used by "v2" strategy for concurrencyHint
return 2;
}
@Override
public String getFormatString()
{
return null;
}
};
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
NOOP_QUERYWATCHER,
bufferPool
),
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool,
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
);
groupByFactory = new GroupByQueryRunnerFactory(
strategySelector,
new GroupByQueryQueryToolChest(
strategySelector,
NoopIntervalChunkingQueryRunnerDecorator()
)
);
}
@After
public void tearDown() throws Exception
{
for (IncrementalIndex incrementalIndex : incrementalIndices) {
incrementalIndex.close();
}
for (QueryableIndex queryableIndex : groupByIndices) {
queryableIndex.close();
}
if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
}
@Test
public void testHavingAndNoLimitPushDown() throws Exception
{
QueryToolChest<Row, GroupByQuery> toolChest = groupByFactory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
groupByFactory.mergeRunners(executorService, makeGroupByMultiRunners())
),
(QueryToolChest) toolChest
);
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(new Interval(0, 1000000)));
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("blah")
.setQuerySegmentSpec(intervalSpec)
.setDimensions(Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("dimA", null)
))
.setAggregatorSpecs(
Arrays.asList(new LongSumAggregatorFactory("metA", "metA"))
)
.setLimitSpec(
new DefaultLimitSpec(
Arrays.asList(new OrderByColumnSpec("dimA", OrderByColumnSpec.Direction.ASCENDING)),
1
)
)
.setHavingSpec(
new GreaterThanHavingSpec("metA", 110)
)
.setGranularity(Granularities.ALL)
.build();
Sequence<Row> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
List<Row> results = Sequences.toList(queryResult, Lists.<Row>newArrayList());
Row expectedRow = GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"dimA", "world",
"metA", 150L
);
Assert.assertEquals(1, results.size());
Assert.assertEquals(expectedRow, results.get(0));
}
private List<QueryRunner<Row>> makeGroupByMultiRunners()
{
List<QueryRunner<Row>> runners = Lists.newArrayList();
for (QueryableIndex qindex : groupByIndices) {
QueryRunner<Row> runner = makeQueryRunner(
groupByFactory,
qindex.toString(),
new QueryableIndexSegment(qindex.toString(), qindex)
);
runners.add(groupByFactory.getToolchest().preMergeQueryDecoration(runner));
}
return runners;
}
private static class OffheapBufferGenerator implements Supplier<ByteBuffer>
{
private static final Logger log = new Logger(OffheapBufferGenerator.class);
private final String description;
private final int computationBufferSize;
private final AtomicLong count = new AtomicLong(0);
public OffheapBufferGenerator(String description, int computationBufferSize)
{
this.description = description;
this.computationBufferSize = computationBufferSize;
}
@Override
public ByteBuffer get()
{
log.info(
"Allocating new %s buffer[%,d] of size[%,d]",
description,
count.getAndIncrement(),
computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
String segmentId,
Segment adapter
)
{
return new FinalizeResultsQueryRunner<T>(
new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
),
(QueryToolChest<T, Query<T>>)factory.getToolchest()
);
}
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
};
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return new QueryRunner<T>() {
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return delegate.run(queryPlus, responseContext);
}
};
}
};
}
}

View File

@ -9198,6 +9198,52 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testRejectForceLimitPushDownWithHaving()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("Cannot force limit push down when a having spec is present.");
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
"marketalias"
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
"marketalias",
OrderByColumnSpec.Direction.DESCENDING
)
),
2
)
)
.setAggregatorSpecs(
Lists.<AggregatorFactory>newArrayList(
QueryRunnerTestHelper.rowsCount
)
)
.setContext(
ImmutableMap.<String, Object>of(
GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN,
true
)
)
.setHavingSpec(
new GreaterThanHavingSpec("rows", 10)
)
.build();
}
@Test
public void testTypeConversionWithMergingChainedExecutionRunner()
{