From 9650d80f3e38d53b8be2cf79695c5c66c8cb6e34 Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Fri, 4 Aug 2017 15:13:29 -0700 Subject: [PATCH] Don't use limit push down with having spec (#4630) * Don't use limit push down with having spec * Throw exception when forcing limit push down with having * Tests for having and limit push down * Fix pool sizes in unit test --- .../io/druid/query/groupby/GroupByQuery.java | 18 + .../query/groupby/GroupByQueryConfig.java | 1 + .../groupby/strategy/GroupByStrategyV2.java | 4 +- .../groupby/GroupByMultiSegmentTest.java | 434 ++++++++++++++++++ .../query/groupby/GroupByQueryRunnerTest.java | 46 ++ 5 files changed, 502 insertions(+), 1 deletion(-) create mode 100644 processing/src/test/java/io/druid/query/groupby/GroupByMultiSegmentTest.java diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 406ba459db2..73bc86f5ac3 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -317,6 +317,12 @@ public class GroupByQuery extends BaseQuery return applyLimitPushDown; } + @JsonIgnore + public boolean getApplyLimitPushDownFromContext() + { + return getContextBoolean(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true); + } + @Override public Ordering getResultOrdering() { @@ -346,6 +352,10 @@ public class GroupByQuery extends BaseQuery throw new IAE("When forcing limit push down, the provided limit spec must have a limit."); } + if (havingSpec != null) { + throw new IAE("Cannot force limit push down when a having spec is present."); + } + for (OrderByColumnSpec orderBySpec : ((DefaultLimitSpec) limitSpec).getColumns()) { if (OrderByColumnSpec.getPostAggIndexForOrderBy(orderBySpec, postAggregatorSpecs) > -1) { throw new UnsupportedOperationException("Limit push down when sorting by a post aggregator is not supported."); @@ -371,6 +381,14 @@ public class GroupByQuery extends BaseQuery return true; } + if (!getApplyLimitPushDownFromContext()) { + return false; + } + + if (havingSpec != null) { + return false; + } + // If the sorting order only uses columns in the grouping key, we can always push the limit down // to the buffer grouper without affecting result accuracy boolean sortHasNonGroupingFields = DefaultLimitSpec.sortingOrderHasNonGroupingFields( diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java index 230acbd0a07..85d8b5216ca 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java @@ -28,6 +28,7 @@ public class GroupByQueryConfig { public static final String CTX_KEY_STRATEGY = "groupByStrategy"; public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown"; + public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown"; private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded"; private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; private static final String CTX_KEY_MAX_RESULTS = "maxResults"; diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 6381fdebd32..cabbaf0ea32 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -246,7 +246,9 @@ public class GroupByStrategyV2 implements GroupByStrategy "finalize", false, GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()), - CTX_KEY_OUTERMOST, false + CTX_KEY_OUTERMOST, false, + // the having spec shouldn't be passed down, so we need to convey the existing limit push down status + GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, query.isApplyLimitPushDown() ) ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByMultiSegmentTest.java new file mode 100644 index 00000000000..ecae130b733 --- /dev/null +++ b/processing/src/test/java/io/druid/query/groupby/GroupByMultiSegmentTest.java @@ -0,0 +1,434 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.collections.BlockingPool; +import io.druid.collections.DefaultBlockingPool; +import io.druid.collections.NonBlockingPool; +import io.druid.collections.StupidPool; +import io.druid.concurrent.Execs; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.Row; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.LongDimensionSchema; +import io.druid.data.input.impl.StringDimensionSchema; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.logger.Logger; +import io.druid.math.expr.ExprMacroTable; +import io.druid.query.BySegmentQueryRunner; +import io.druid.query.DruidProcessingConfig; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; +import io.druid.query.Query; +import io.druid.query.QueryPlus; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.having.GreaterThanHavingSpec; +import io.druid.query.groupby.orderby.DefaultLimitSpec; +import io.druid.query.groupby.orderby.OrderByColumnSpec; +import io.druid.query.groupby.strategy.GroupByStrategySelector; +import io.druid.query.groupby.strategy.GroupByStrategyV1; +import io.druid.query.groupby.strategy.GroupByStrategyV2; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.Segment; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.commons.io.FileUtils; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +public class GroupByMultiSegmentTest +{ + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + private File tmpDir; + private QueryRunnerFactory groupByFactory; + private List incrementalIndices = Lists.newArrayList(); + private List groupByIndices = Lists.newArrayList(); + private ExecutorService executorService; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + JSON_MAPPER.setInjectableValues( + new InjectableValues.Std().addValue( + ExprMacroTable.class, + ExprMacroTable.nil() + ) + ); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO); + } + + + private IncrementalIndex makeIncIndex(boolean withRollup) + { + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withDimensionsSpec(new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dimA"), + new LongDimensionSchema("metA") + ), + null, + null + )) + .withRollup(withRollup) + .build() + ) + .setReportParseExceptions(false) + .setConcurrentEventAdd(true) + .setMaxRowCount(1000) + .buildOnheap(); + } + + @Before + public void setup() throws Exception + { + tmpDir = Files.createTempDir(); + + InputRow row; + List dimNames = Arrays.asList("dimA", "metA"); + Map event; + + final IncrementalIndex indexA = makeIncIndex(false); + incrementalIndices.add(indexA); + event = new HashMap<>(); + event.put("dimA", "hello"); + event.put("metA", 100); + row = new MapBasedInputRow(1000, dimNames, event); + indexA.add(row); + event = new HashMap<>(); + event.put("dimA", "world"); + event.put("metA", 75); + row = new MapBasedInputRow(1000, dimNames, event); + indexA.add(row); + final File fileA = INDEX_MERGER_V9.persist( + indexA, + new File(tmpDir, "A"), + new IndexSpec() + ); + QueryableIndex qindexA = INDEX_IO.loadIndex(fileA); + + + final IncrementalIndex indexB = makeIncIndex(false); + incrementalIndices.add(indexB); + event = new HashMap<>(); + event.put("dimA", "foo"); + event.put("metA", 100); + row = new MapBasedInputRow(1000, dimNames, event); + indexB.add(row); + event = new HashMap<>(); + event.put("dimA", "world"); + event.put("metA", 75); + row = new MapBasedInputRow(1000, dimNames, event); + indexB.add(row); + + final File fileB = INDEX_MERGER_V9.persist( + indexB, + new File(tmpDir, "B"), + new IndexSpec() + ); + QueryableIndex qindexB = INDEX_IO.loadIndex(fileB); + + groupByIndices = Arrays.asList(qindexA, qindexB); + setupGroupByFactory(); + } + + private void setupGroupByFactory() + { + executorService = Execs.multiThreaded(2, "GroupByThreadPool[%d]"); + + NonBlockingPool bufferPool = new StupidPool<>( + "GroupByBenchmark-computeBufferPool", + new OffheapBufferGenerator("compute", 10_000_000), + 0, + Integer.MAX_VALUE + ); + + // limit of 2 is required since we simulate both historical merge and broker merge in the same process + BlockingPool mergePool = new DefaultBlockingPool<>( + new OffheapBufferGenerator("merge", 10_000_000), + 2 + ); + final GroupByQueryConfig config = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return "v2"; + } + + @Override + public int getBufferGrouperInitialBuckets() + { + return -1; + } + + @Override + public long getMaxOnDiskStorage() + { + return 1_000_000_000L; + } + }; + config.setSingleThreaded(false); + config.setMaxIntermediateRows(Integer.MAX_VALUE); + config.setMaxResults(Integer.MAX_VALUE); + + DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig() + { + @Override + public int getNumThreads() + { + // Used by "v2" strategy for concurrencyHint + return 2; + } + + @Override + public String getFormatString() + { + return null; + } + }; + + final Supplier configSupplier = Suppliers.ofInstance(config); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + configSupplier, + new GroupByStrategyV1( + configSupplier, + new GroupByQueryEngine(configSupplier, bufferPool), + NOOP_QUERYWATCHER, + bufferPool + ), + new GroupByStrategyV2( + druidProcessingConfig, + configSupplier, + bufferPool, + mergePool, + new ObjectMapper(new SmileFactory()), + NOOP_QUERYWATCHER + ) + ); + + groupByFactory = new GroupByQueryRunnerFactory( + strategySelector, + new GroupByQueryQueryToolChest( + strategySelector, + NoopIntervalChunkingQueryRunnerDecorator() + ) + ); + } + + @After + public void tearDown() throws Exception + { + for (IncrementalIndex incrementalIndex : incrementalIndices) { + incrementalIndex.close(); + } + + for (QueryableIndex queryableIndex : groupByIndices) { + queryableIndex.close(); + } + + if (tmpDir != null) { + FileUtils.deleteDirectory(tmpDir); + } + } + + @Test + public void testHavingAndNoLimitPushDown() throws Exception + { + QueryToolChest toolChest = groupByFactory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + groupByFactory.mergeRunners(executorService, makeGroupByMultiRunners()) + ), + (QueryToolChest) toolChest + ); + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(new Interval(0, 1000000))); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("blah") + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimA", null) + )) + .setAggregatorSpecs( + Arrays.asList(new LongSumAggregatorFactory("metA", "metA")) + ) + .setLimitSpec( + new DefaultLimitSpec( + Arrays.asList(new OrderByColumnSpec("dimA", OrderByColumnSpec.Direction.ASCENDING)), + 1 + ) + ) + .setHavingSpec( + new GreaterThanHavingSpec("metA", 110) + ) + .setGranularity(Granularities.ALL) + .build(); + + Sequence queryResult = theRunner.run(query, Maps.newHashMap()); + List results = Sequences.toList(queryResult, Lists.newArrayList()); + + Row expectedRow = GroupByQueryRunnerTestHelper.createExpectedRow( + "1970-01-01T00:00:00.000Z", + "dimA", "world", + "metA", 150L + ); + + Assert.assertEquals(1, results.size()); + Assert.assertEquals(expectedRow, results.get(0)); + } + + private List> makeGroupByMultiRunners() + { + List> runners = Lists.newArrayList(); + + for (QueryableIndex qindex : groupByIndices) { + QueryRunner runner = makeQueryRunner( + groupByFactory, + qindex.toString(), + new QueryableIndexSegment(qindex.toString(), qindex) + ); + runners.add(groupByFactory.getToolchest().preMergeQueryDecoration(runner)); + } + return runners; + } + + private static class OffheapBufferGenerator implements Supplier + { + private static final Logger log = new Logger(OffheapBufferGenerator.class); + + private final String description; + private final int computationBufferSize; + private final AtomicLong count = new AtomicLong(0); + + public OffheapBufferGenerator(String description, int computationBufferSize) + { + this.description = description; + this.computationBufferSize = computationBufferSize; + } + + @Override + public ByteBuffer get() + { + log.info( + "Allocating new %s buffer[%,d] of size[%,d]", + description, + count.getAndIncrement(), + computationBufferSize + ); + + return ByteBuffer.allocateDirect(computationBufferSize); + } + } + + public static > QueryRunner makeQueryRunner( + QueryRunnerFactory factory, + String segmentId, + Segment adapter + ) + { + return new FinalizeResultsQueryRunner( + new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) + ), + (QueryToolChest>)factory.getToolchest() + ); + } + + public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; + + public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() + { + return new IntervalChunkingQueryRunnerDecorator(null, null, null) { + @Override + public QueryRunner decorate(final QueryRunner delegate, QueryToolChest> toolChest) + { + return new QueryRunner() { + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + return delegate.run(queryPlus, responseContext); + } + }; + } + }; + } +} diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 0a1d327a5a0..f9bd40599ae 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -9198,6 +9198,52 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testRejectForceLimitPushDownWithHaving() + { + expectedException.expect(IAE.class); + expectedException.expectMessage("Cannot force limit push down when a having spec is present."); + + GroupByQuery query = new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setGranularity(QueryRunnerTestHelper.allGran) + .setDimensions( + Arrays.asList( + new DefaultDimensionSpec( + QueryRunnerTestHelper.marketDimension, + "marketalias" + ) + ) + ) + .setInterval(QueryRunnerTestHelper.fullOnInterval) + .setLimitSpec( + new DefaultLimitSpec( + Lists.newArrayList( + new OrderByColumnSpec( + "marketalias", + OrderByColumnSpec.Direction.DESCENDING + ) + ), + 2 + ) + ) + .setAggregatorSpecs( + Lists.newArrayList( + QueryRunnerTestHelper.rowsCount + ) + ) + .setContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, + true + ) + ) + .setHavingSpec( + new GreaterThanHavingSpec("rows", 10) + ) + .build(); + } + @Test public void testTypeConversionWithMergingChainedExecutionRunner() {