From 255399720030ecf249356990863b98017670223e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 27 Jul 2016 18:44:19 -0700 Subject: [PATCH] Associate groupBy v2 resources with the Sequence lifecycle. (#3296) This fixes a potential issue where groupBy resources could be allocated to create a Sequence, but then the Sequence is never used, and thus the resources are never freed. Also simplifies how groupBy handles config overrides (this made the new unit test easier to write). --- docs/content/querying/groupbyquery.md | 1 + .../druid/query/GroupByMergedQueryRunner.java | 12 +- .../query/groupby/GroupByQueryConfig.java | 36 ++ .../query/groupby/GroupByQueryEngine.java | 12 +- .../query/groupby/GroupByQueryHelper.java | 6 +- .../groupby/epinephelinae/BufferGrouper.java | 3 +- .../GroupByMergingQueryRunnerV2.java | 420 +++++++++--------- .../epinephelinae/GroupByQueryEngineV2.java | 8 +- .../query/groupby/epinephelinae/Grouper.java | 4 +- .../strategy/GroupByStrategySelector.java | 3 +- .../groupby/strategy/GroupByStrategyV1.java | 2 +- .../groupby/strategy/GroupByStrategyV2.java | 7 +- .../query/groupby/GroupByQueryRunnerTest.java | 30 +- 13 files changed, 293 insertions(+), 251 deletions(-) diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index 692339b267f..57cdff5c3cd 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -203,3 +203,4 @@ When using the "v2" strategy, the following query context parameters apply: |--------|-----------| |`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.| |`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.| +|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.| diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index 5040d34946a..4eae0d84ff7 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -56,8 +56,6 @@ import java.util.concurrent.TimeoutException; public class GroupByMergedQueryRunner implements QueryRunner { - private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded"; - private static final Logger log = new Logger(GroupByMergedQueryRunner.class); private final Iterable> queryables; private final ListeningExecutorService exec; @@ -84,15 +82,11 @@ public class GroupByMergedQueryRunner implements QueryRunner public Sequence run(final Query queryParam, final Map responseContext) { final GroupByQuery query = (GroupByQuery) queryParam; - - final boolean isSingleThreaded = query.getContextValue( - CTX_KEY_IS_SINGLE_THREADED, - configSupplier.get().isSingleThreaded() - ); - + final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query); + final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded(); final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( query, - configSupplier.get(), + querySpecificConfig, bufferPool ); final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java index 6a669e443c5..9fb38558aeb 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java @@ -26,6 +26,14 @@ import io.druid.query.groupby.strategy.GroupByStrategySelector; */ public class GroupByQueryConfig { + public static final String CTX_KEY_STRATEGY = "groupByStrategy"; + private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded"; + private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; + private static final String CTX_KEY_MAX_RESULTS = "maxResults"; + private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets"; + private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; + private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage"; + @JsonProperty private String defaultStrategy = GroupByStrategySelector.STRATEGY_V1; @@ -107,4 +115,32 @@ public class GroupByQueryConfig { return maxOnDiskStorage; } + + public GroupByQueryConfig withOverrides(final GroupByQuery query) + { + final GroupByQueryConfig newConfig = new GroupByQueryConfig(); + newConfig.defaultStrategy = query.getContextValue(CTX_KEY_STRATEGY, getDefaultStrategy()); + newConfig.singleThreaded = query.getContextBoolean(CTX_KEY_IS_SINGLE_THREADED, isSingleThreaded()); + newConfig.maxIntermediateRows = Math.min( + query.getContextValue(CTX_KEY_MAX_INTERMEDIATE_ROWS, getMaxIntermediateRows()), + getMaxIntermediateRows() + ); + newConfig.maxResults = Math.min( + query.getContextValue(CTX_KEY_MAX_RESULTS, getMaxResults()), + getMaxResults() + ); + newConfig.bufferGrouperInitialBuckets = query.getContextValue( + CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS, + getBufferGrouperInitialBuckets() + ); + newConfig.bufferGrouperMaxSize = Math.min( + query.getContextValue(CTX_KEY_BUFFER_GROUPER_MAX_SIZE, getBufferGrouperMaxSize()), + getBufferGrouperMaxSize() + ); + newConfig.maxOnDiskStorage = Math.min( + ((Number)query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(), + getMaxOnDiskStorage() + ); + return newConfig; + } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index 984d10210a6..756078680eb 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -68,8 +68,6 @@ import java.util.NoSuchElementException; */ public class GroupByQueryEngine { - private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; - private final Supplier config; private final StupidPool intermediateResultsBufferPool; @@ -310,16 +308,12 @@ public class GroupByQueryEngine public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBuffer, GroupByQueryConfig config) { + final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); + this.query = query; this.cursor = cursor; this.metricsBuffer = metricsBuffer; - - this.maxIntermediateRows = Math.min( - query.getContextValue( - CTX_KEY_MAX_INTERMEDIATE_ROWS, - config.getMaxIntermediateRows() - ), config.getMaxIntermediateRows() - ); + this.maxIntermediateRows = querySpecificConfig.getMaxIntermediateRows(); unprocessedKeys = null; delegate = Iterators.emptyIterator(); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 37ad1505d5a..d13ec68bc47 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -45,7 +45,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; public class GroupByQueryHelper { - private static final String CTX_KEY_MAX_RESULTS = "maxResults"; public final static String CTX_KEY_SORT_RESULTS = "sortResults"; public static Pair> createIndexAccumulatorPair( @@ -54,6 +53,7 @@ public class GroupByQueryHelper StupidPool bufferPool ) { + final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); final QueryGranularity gran = query.getGranularity(); final long timeStart = query.getIntervals().get(0).getStartMillis(); @@ -97,7 +97,7 @@ public class GroupByQueryHelper false, true, sortResults, - Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()), + querySpecificConfig.getMaxResults(), bufferPool ); } else { @@ -110,7 +110,7 @@ public class GroupByQueryHelper false, true, sortResults, - Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()) + querySpecificConfig.getMaxResults() ); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java index 592ff2720be..273158e69e3 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java @@ -60,6 +60,7 @@ public class BufferGrouper> implements Group { private static final Logger log = new Logger(BufferGrouper.class); + private static final int MIN_INITIAL_BUCKETS = 4; private static final int DEFAULT_INITIAL_BUCKETS = 1024; private static final float MAX_LOAD_FACTOR = 0.75f; private static final int HASH_SIZE = Ints.BYTES; @@ -104,7 +105,7 @@ public class BufferGrouper> implements Group this.aggregators = new BufferAggregator[aggregatorFactories.length]; this.aggregatorOffsets = new int[aggregatorFactories.length]; this.bufferGrouperMaxSize = bufferGrouperMaxSize; - this.initialBuckets = initialBuckets > 0 ? initialBuckets : DEFAULT_INITIAL_BUCKETS; + this.initialBuckets = initialBuckets > 0 ? Math.max(MIN_INITIAL_BUCKETS, initialBuckets) : DEFAULT_INITIAL_BUCKETS; int offset = HASH_SIZE + keySize; for (int i = 0; i < aggregatorFactories.length; i++) { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index febbeb7301f..95f42743ee6 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -41,12 +41,12 @@ import com.metamx.common.ISE; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.CloseQuietly; -import com.metamx.common.guava.ResourceClosingSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.logger.Logger; import io.druid.collections.BlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.Releaser; +import io.druid.common.utils.JodaUtils; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.query.AbstractPrioritizedCallable; @@ -119,6 +119,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner public Sequence run(final Query queryParam, final Map responseContext) { final GroupByQuery query = (GroupByQuery) queryParam; + final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); // CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION is here because realtime servers use nested mergeRunners calls // (one for the entire query and one for each sink). We only want the outer call to actually do merging with a @@ -142,246 +143,239 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner combiningAggregatorFactories[i] = query.getAggregatorSpecs().get(i).getCombiningFactory(); } - final GroupByMergingKeySerdeFactory keySerdeFactory = new GroupByMergingKeySerdeFactory( - query.getDimensions().size(), - config.getMaxMergingDictionarySize() / concurrencyHint - ); - final GroupByMergingColumnSelectorFactory columnSelectorFactory = new GroupByMergingColumnSelectorFactory(); - final File temporaryStorageDirectory = new File( System.getProperty("java.io.tmpdir"), String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); - final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( - temporaryStorageDirectory, - config.getMaxOnDiskStorage() - ); - // Figure out timeoutAt time now, so we can apply the timeout to both the mergeBufferPool.take and the actual // query processing together. - final long startTime = System.currentTimeMillis(); - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeoutAt = timeout == null ? -1L : startTime + timeout.longValue(); + final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); + final long timeoutAt = queryTimeout == null + ? JodaUtils.MAX_INSTANT + : System.currentTimeMillis() + queryTimeout.longValue(); - final ReferenceCountingResourceHolder mergeBufferHolder; + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public CloseableGrouperIterator make() + { + final List closeOnFailure = Lists.newArrayList(); - try { - // This will potentially block if there are no merge buffers left in the pool. - mergeBufferHolder = mergeBufferPool.take(timeout != null && timeout.longValue() > 0 ? timeout.longValue() : -1); - } - catch (InterruptedException e) { - CloseQuietly.close(temporaryStorage); - Thread.currentThread().interrupt(); - throw Throwables.propagate(e); - } + try { + final ReferenceCountingResourceHolder mergeBufferHolder; + final LimitedTemporaryStorage temporaryStorage; + final Grouper grouper; - final long processingStartTime = System.currentTimeMillis(); + temporaryStorage = new LimitedTemporaryStorage( + temporaryStorageDirectory, + querySpecificConfig.getMaxOnDiskStorage() + ); + closeOnFailure.add(temporaryStorage); - try { - return new ResourceClosingSequence<>( - new BaseSequence<>( - new BaseSequence.IteratorMaker>() + try { + // This will potentially block if there are no merge buffers left in the pool. + final long timeout = timeoutAt - System.currentTimeMillis(); + if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { + throw new QueryInterruptedException(new TimeoutException()); + } + closeOnFailure.add(mergeBufferHolder); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + + final GroupByMergingKeySerdeFactory keySerdeFactory = new GroupByMergingKeySerdeFactory( + query.getDimensions().size(), + querySpecificConfig.getMaxMergingDictionarySize() / concurrencyHint + ); + final GroupByMergingColumnSelectorFactory columnSelectorFactory = new GroupByMergingColumnSelectorFactory(); + + grouper = new ConcurrentGrouper<>( + mergeBufferHolder.get(), + concurrencyHint, + temporaryStorage, + spillMapper, + querySpecificConfig.getBufferGrouperMaxSize(), + querySpecificConfig.getBufferGrouperInitialBuckets(), + keySerdeFactory, + columnSelectorFactory, + combiningAggregatorFactories + ); + closeOnFailure.add(grouper); + + final Accumulator, Row> accumulator = new Accumulator, Row>() { @Override - public CloseableGrouperIterator make() + public Grouper accumulate( + final Grouper theGrouper, + final Row row + ) { - final Grouper grouper = new ConcurrentGrouper<>( - mergeBufferHolder.get(), - concurrencyHint, - temporaryStorage, - spillMapper, - config.getBufferGrouperMaxSize(), - GroupByStrategyV2.getBufferGrouperInitialBuckets(config, query), - keySerdeFactory, - columnSelectorFactory, - combiningAggregatorFactories - ); + if (theGrouper == null) { + // Pass-through null returns without doing more work. + return null; + } - final Accumulator, Row> accumulator = new Accumulator, Row>() + final long timestamp = row.getTimestampFromEpoch(); + + final String[] dimensions = new String[query.getDimensions().size()]; + for (int i = 0; i < dimensions.length; i++) { + final Object dimValue = row.getRaw(query.getDimensions().get(i).getOutputName()); + dimensions[i] = Strings.nullToEmpty((String) dimValue); + } + + columnSelectorFactory.setRow(row); + final boolean didAggregate = theGrouper.aggregate(new GroupByMergingKey(timestamp, dimensions)); + if (!didAggregate) { + // null return means grouping resources were exhausted. + return null; + } + columnSelectorFactory.setRow(null); + + return theGrouper; + } + }; + + final int priority = BaseQuery.getContextPriority(query, 0); + + final ReferenceCountingResourceHolder> grouperHolder = new ReferenceCountingResourceHolder<>( + grouper, + new Closeable() { @Override - public Grouper accumulate( - final Grouper theGrouper, - final Row row - ) + public void close() throws IOException { - if (theGrouper == null) { - // Pass-through null returns without doing more work. - return null; - } - - final long timestamp = row.getTimestampFromEpoch(); - - final String[] dimensions = new String[query.getDimensions().size()]; - for (int i = 0; i < dimensions.length; i++) { - final Object dimValue = row.getRaw(query.getDimensions().get(i).getOutputName()); - dimensions[i] = Strings.nullToEmpty((String) dimValue); - } - - columnSelectorFactory.setRow(row); - final boolean didAggregate = theGrouper.aggregate(new GroupByMergingKey(timestamp, dimensions)); - if (!didAggregate) { - // null return means grouping resources were exhausted. - return null; - } - columnSelectorFactory.setRow(null); - - return theGrouper; + grouper.close(); } - }; + } + ); - final int priority = BaseQuery.getContextPriority(query, 0); + ListenableFuture> futures = Futures.allAsList( + Lists.newArrayList( + Iterables.transform( + queryables, + new Function, ListenableFuture>() + { + @Override + public ListenableFuture apply(final QueryRunner input) + { + if (input == null) { + throw new ISE( + "Null queryRunner! Looks to be some segment unmapping action happening" + ); + } - final ReferenceCountingResourceHolder> grouperHolder = new ReferenceCountingResourceHolder<>( - grouper, - new Closeable() - { - @Override - public void close() throws IOException - { - grouper.close(); - } - } - ); + final Releaser bufferReleaser = mergeBufferHolder.increment(); + try { + final Releaser grouperReleaser = grouperHolder.increment(); + try { + return exec.submit( + new AbstractPrioritizedCallable(priority) + { + @Override + public Boolean call() throws Exception + { + try { + final Object retVal = input.run(queryForRunners, responseContext) + .accumulate(grouper, accumulator); - try { - ListenableFuture> futures = Futures.allAsList( - Lists.newArrayList( - Iterables.transform( - queryables, - new Function, ListenableFuture>() - { - @Override - public ListenableFuture apply(final QueryRunner input) - { - if (input == null) { - throw new ISE( - "Null queryRunner! Looks to be some segment unmapping action happening" - ); - } - - final Releaser bufferReleaser = mergeBufferHolder.increment(); - try { - final Releaser grouperReleaser = grouperHolder.increment(); - try { - return exec.submit( - new AbstractPrioritizedCallable(priority) - { - @Override - public Boolean call() throws Exception - { - try { - final Object retVal = input.run(queryForRunners, responseContext) - .accumulate(grouper, accumulator); - - // Return true if OK, false if resources were exhausted. - return retVal == grouper; - } - catch (QueryInterruptedException e) { - throw e; - } - catch (Exception e) { - log.error(e, "Exception with one of the sequences!"); - throw Throwables.propagate(e); - } - finally { - grouperReleaser.close(); - bufferReleaser.close(); - } - } - } - ); + // Return true if OK, false if resources were exhausted. + return retVal == grouper; + } + catch (QueryInterruptedException e) { + throw e; + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + finally { + grouperReleaser.close(); + bufferReleaser.close(); + } + } } - catch (Exception e) { - // Exception caught while submitting the task; release resources. - grouperReleaser.close(); - throw e; - } - } - catch (Exception e) { - // Exception caught while submitting the task; release resources. - bufferReleaser.close(); - throw e; - } - } + ); } - ) - ) - ); - - waitForFutureCompletion(query, futures, timeoutAt - processingStartTime); - } - catch (Exception e) { - // Exception caught while creating or waiting for futures; release resources. - grouperHolder.close(); - throw e; - } - - return new CloseableGrouperIterator<>( - grouper, - true, - new Function, Row>() - { - @Override - public Row apply(Grouper.Entry entry) - { - Map theMap = Maps.newLinkedHashMap(); - - // Add dimensions. - for (int i = 0; i < entry.getKey().getDimensions().length; i++) { - theMap.put( - query.getDimensions().get(i).getOutputName(), - Strings.emptyToNull(entry.getKey().getDimensions()[i]) - ); + catch (Exception e) { + // Exception caught while submitting the task; release resources. + grouperReleaser.close(); + throw e; + } + } + catch (Exception e) { + // Exception caught while submitting the task; release resources. + bufferReleaser.close(); + throw e; + } + } } + ) + ) + ); - // Add aggregations. - for (int i = 0; i < entry.getValues().length; i++) { - theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); - } + waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis()); - return new MapBasedRow( - query.getGranularity().toDateTime(entry.getKey().getTimestamp()), - theMap - ); - } - }, - new Closeable() - { - @Override - public void close() throws IOException - { - grouperHolder.close(); - } + return new CloseableGrouperIterator<>( + grouper, + true, + new Function, Row>() + { + @Override + public Row apply(Grouper.Entry entry) + { + Map theMap = Maps.newLinkedHashMap(); + + // Add dimensions. + for (int i = 0; i < entry.getKey().getDimensions().length; i++) { + theMap.put( + query.getDimensions().get(i).getOutputName(), + Strings.emptyToNull(entry.getKey().getDimensions()[i]) + ); } - ); - } - @Override - public void cleanup(CloseableGrouperIterator iterFromMake) - { - iterFromMake.close(); - } + // Add aggregations. + for (int i = 0; i < entry.getValues().length; i++) { + theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); + } + + return new MapBasedRow( + query.getGranularity().toDateTime(entry.getKey().getTimestamp()), + theMap + ); + } + }, + new Closeable() + { + @Override + public void close() throws IOException + { + grouperHolder.close(); + mergeBufferHolder.close(); + CloseQuietly.close(temporaryStorage); + } + } + ); + } + catch (Throwable e) { + // Exception caught while setting up the iterator; release resources. + for (Closeable closeable : Lists.reverse(closeOnFailure)) { + CloseQuietly.close(closeable); } - ), - new Closeable() - { - @Override - public void close() throws IOException - { - mergeBufferHolder.close(); - CloseQuietly.close(temporaryStorage); + throw e; } } - ); - } - catch (Exception e) { - // Exception caught while creating the sequence; release resources. - mergeBufferHolder.close(); - CloseQuietly.close(temporaryStorage); - throw e; - } + + @Override + public void cleanup(CloseableGrouperIterator iterFromMake) + { + iterFromMake.close(); + } + } + ); } private void waitForFutureCompletion( @@ -391,16 +385,16 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner ) { try { - final List results; if (queryWatcher != null) { queryWatcher.registerQuery(query, future); } + if (timeout <= 0) { - results = future.get(); - } else { - results = future.get(timeout, TimeUnit.MILLISECONDS); + throw new TimeoutException(); } + final List results = future.get(timeout, TimeUnit.MILLISECONDS); + for (Boolean result : results) { if (!result) { future.cancel(true); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index faea1392936..47dd3998596 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -148,7 +148,7 @@ public class GroupByQueryEngineV2 private static class GroupByEngineIterator implements Iterator, Closeable { private final GroupByQuery query; - private final GroupByQueryConfig config; + private final GroupByQueryConfig querySpecificConfig; private final Cursor cursor; private final ByteBuffer buffer; private final Grouper.KeySerde keySerde; @@ -174,7 +174,7 @@ public class GroupByQueryEngineV2 final int dimCount = query.getDimensions().size(); this.query = query; - this.config = config; + this.querySpecificConfig = config.withOverrides(query); this.cursor = cursor; this.buffer = buffer; this.keySerde = keySerde; @@ -213,8 +213,8 @@ public class GroupByQueryEngineV2 cursor, query.getAggregatorSpecs() .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]), - config.getBufferGrouperMaxSize(), - GroupByStrategyV2.getBufferGrouperInitialBuckets(config, query) + querySpecificConfig.getBufferGrouperMaxSize(), + querySpecificConfig.getBufferGrouperInitialBuckets() ); outer: diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java index 68570429212..c5117e65831 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java @@ -22,6 +22,7 @@ package io.druid.query.groupby.epinephelinae; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Closeable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -36,7 +37,7 @@ import java.util.Iterator; * * @param type of the key that will be passed in */ -public interface Grouper> +public interface Grouper> extends Closeable { /** * Aggregate the current row with the provided key. Some implementations are thread-safe and @@ -67,6 +68,7 @@ public interface Grouper> /** * Close the grouper and release associated resources. */ + @Override void close(); /** diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java index 220418747a2..2f58ca0f238 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java @@ -27,7 +27,6 @@ import io.druid.query.groupby.GroupByQueryConfig; public class GroupByStrategySelector { - public static final String CTX_KEY_STRATEGY = "groupByStrategy"; public static final String STRATEGY_V2 = "v2"; public static final String STRATEGY_V1 = "v1"; @@ -49,7 +48,7 @@ public class GroupByStrategySelector public GroupByStrategy strategize(GroupByQuery query) { - final String strategyString = query.getContextValue(CTX_KEY_STRATEGY, config.getDefaultStrategy()); + final String strategyString = config.withOverrides(query).getDefaultStrategy(); switch (strategyString) { case STRATEGY_V2: diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 1e17084c0dd..546fd14c400 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -99,7 +99,7 @@ public class GroupByStrategyV1 implements GroupByStrategy //no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would return //merged results GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false, - GroupByStrategySelector.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1 + GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1 ) ), responseContext diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index a207bd5ff25..acc9856ffae 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -142,7 +142,7 @@ public class GroupByStrategyV2 implements GroupByStrategy ).withOverriddenContext( ImmutableMap.of( "finalize", false, - GroupByStrategySelector.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, + GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp ) ), @@ -203,9 +203,4 @@ public class GroupByStrategyV2 implements GroupByStrategy { return GroupByQueryEngineV2.process(query, storageAdapter, bufferPool, configSupplier.get()); } - - public static int getBufferGrouperInitialBuckets(final GroupByQueryConfig config, final GroupByQuery query) - { - return query.getContextValue("bufferGrouperInitialBuckets", config.getBufferGrouperInitialBuckets()); - } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 983cced4434..8322bbbe42c 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -156,7 +156,7 @@ public class GroupByQueryRunnerTest return ByteBuffer.allocate(10 * 1024 * 1024); } }, - 4 + 2 // There are some tests that need to allocate two buffers (simulating two levels of merging) ); final GroupByStrategySelector strategySelector = new GroupByStrategySelector( configSupplier, @@ -854,7 +854,7 @@ public class GroupByQueryRunnerTest } @Test - public void testGroupByMaxRowsLimitContextOverrid() + public void testGroupByMaxRowsLimitContextOverride() { GroupByQuery query = GroupByQuery .builder() @@ -878,6 +878,32 @@ public class GroupByQueryRunnerTest GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } + @Test + public void testGroupByMaxOnDiskStorageContextOverride() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setContext(ImmutableMap.of("maxOnDiskStorage", 0, "bufferGrouperMaxSize", 1)) + .build(); + + if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + expectedException.expect(ISE.class); + expectedException.expectMessage("Grouping resources exhausted"); + } + + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } + @Test public void testGroupByWithRebucketRename() {