diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index 3c275b16c8c..82f2d81a39e 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -141,8 +141,11 @@ on-heap by default, but it can optionally store aggregated values off-heap. Query API and results are compatible between the two engines; however, there are some differences from a cluster configuration perspective: -- groupBy v1 merges results in heap, whereas groupBy v2 merges results off-heap. As a result, optimal configuration for -your Druid nodes may involve less heap (-Xmx, -Xms) and more direct memory (-XX:MaxDirectMemorySize). +- groupBy v1 controls resource usage using a row-based limit (maxResults) whereas groupBy v2 uses bytes-based limits. +In addition, groupBy v1 merges results on-heap, whereas groupBy v2 merges results off-heap. These factors mean that +memory tuning and resource limits behave differently between v1 and v2. In particular, due to this, some queries +that can complete successfully in one engine may exceed resource limits and fail with the other engine. See the +"Memory tuning and resource limits" section for more details. - groupBy v1 imposes no limit on the number of concurrently running queries, whereas groupBy v2 controls memory usage by using a finite-sized merge buffer pool. By default, the number of merge buffers is 1/4 the number of processing threads. You can adjust this as necessary to balance concurrency and memory usage. @@ -151,6 +154,37 @@ historical nodes. - groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the broker, whereas groupBy v2 ignores chunkPeriod. +#### Memory tuning and resource limits + +When using groupBy v2, three parameters control resource usage and limits: + +- druid.processing.buffer.sizeBytes: size of the off-heap hash table used for aggregation, per query, in bytes. At +most druid.processing.numMergeBuffers of these will be created at once, which also serves as an upper limit on the +number of concurrently running groupBy queries. +- druid.query.groupBy.maxMergingDictionarySize: size of the on-heap dictionary used when grouping on strings, per query, +in bytes. Note that this is based on a rough estimate of the dictionary size, not the actual size. +- druid.query.groupBy.maxOnDiskStorage: amount of space on disk used for aggregation, per query, in bytes. By default, +this is 0, which means aggregation will not use disk. + +If maxOnDiskStorage is 0 (the default) then a query that exceeds either the on-heap dictionary limit, or the off-heap +aggregation table limit, will fail with a "Resource limit exceeded" error describing the limit that was exceeded. + +If maxOnDiskStorage is greater than 0, queries that exceed the in-memory limits will start using disk for aggregation. +In this case, when either the on-heap dictionary or off-heap hash table fills up, partially aggregated records will be +sorted and flushed to disk. Then, both in-memory structures will be cleared out for further aggregation. Queries that +then go on to exceed maxOnDiskStorage will fail with a "Resource limit exceeded" error indicating that they ran out of +disk space. + +With groupBy v2, cluster operators should make sure that the off-heap hash tables and on-heap merging dictionaries +will not exceed available memory for the maximum possible concurrent query load (given by +druid.processing.numMergeBuffers). + +When using groupBy v1, all aggregation is done on-heap, and resource limits are done through the parameter +druid.query.groupBy.maxResults. This is a cap on the maximum number of results in a result set. Queries that exceed +this limit will fail with a "Resource limit exceeded" error indicating they exceeded their row limit. Cluster +operators should make sure that the on-heap aggregations will not exceed available JVM heap space for the expected +concurrent query load. + #### Alternatives There are some situations where other query types may be a better choice than groupBy. diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java index 2d8f6e6ae7a..c70b60e4228 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java @@ -75,14 +75,14 @@ public class BufferGrouperUsingSketchMergeAggregatorFactoryTest columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk()); } updateSketch.update(3); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk()); } Object[] holders = Lists.newArrayList(grouper.iterator(true)).get(0).getValues(); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 9977ec124ff..64f67e14a4c 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -256,18 +256,4 @@ public class GroupByQueryHelper // Don't include post-aggregators since we don't know what types they are. return types.build(); } - - /** - * Throw a {@link ResourceLimitExceededException}. Only used by groupBy v2 when accumulation resources - * are exceeded, triggered by false return from {@link io.druid.query.groupby.epinephelinae.Grouper#aggregate(Object)}. - * - * @return nothing will ever be returned; this return type is for your convenience, similar to - * Throwables.propagate in Guava. - */ - public static ResourceLimitExceededException throwAccumulationResourceLimitExceededException() - { - throw new ResourceLimitExceededException( - "Not enough resources to execute this query. Try increasing druid.query.groupBy.maxOnDiskStorage, " - + "druid.query.groupBy.maxMergingDictionarySize, or druid.processing.buffer.sizeBytes."); - } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/AggregateResult.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AggregateResult.java new file mode 100644 index 00000000000..57b1e9586fd --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AggregateResult.java @@ -0,0 +1,85 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.epinephelinae; + +import java.util.Objects; + +public class AggregateResult +{ + private static final AggregateResult OK = new AggregateResult(true, null); + + private final boolean ok; + private final String reason; + + public static AggregateResult ok() + { + return OK; + } + + public static AggregateResult failure(final String reason) + { + return new AggregateResult(false, reason); + } + + private AggregateResult(final boolean ok, final String reason) + { + this.ok = ok; + this.reason = reason; + } + + public boolean isOk() + { + return ok; + } + + public String getReason() + { + return reason; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AggregateResult that = (AggregateResult) o; + return ok == that.ok && + Objects.equals(reason, that.reason); + } + + @Override + public int hashCode() + { + return Objects.hash(ok, reason); + } + + @Override + public String toString() + { + return "AggregateResult{" + + "ok=" + ok + + ", reason='" + reason + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java index bef3ba22682..9c4a25da2c8 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java @@ -59,6 +59,16 @@ import java.util.List; public class BufferGrouper implements Grouper { private static final Logger log = new Logger(BufferGrouper.class); + private static final AggregateResult DICTIONARY_FULL = AggregateResult.failure( + "Not enough dictionary space to execute this query. Try increasing " + + "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting " + + "druid.query.groupBy.maxOnDiskStorage to a positive number." + ); + private static final AggregateResult HASHTABLE_FULL = AggregateResult.failure( + "Not enough aggregation table space to execute this query. Try increasing " + + "druid.processing.buffer.sizeBytes or enable disk spilling by setting " + + "druid.query.groupBy.maxOnDiskStorage to a positive number." + ); private static final int MIN_INITIAL_BUCKETS = 4; private static final int DEFAULT_INITIAL_BUCKETS = 1024; @@ -146,11 +156,13 @@ public class BufferGrouper implements Grouper } @Override - public boolean aggregate(KeyType key, int keyHash) + public AggregateResult aggregate(KeyType key, int keyHash) { final ByteBuffer keyBuffer = keySerde.toByteBuffer(key); if (keyBuffer == null) { - return false; + // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will + // be correct. + return DICTIONARY_FULL; } if (keyBuffer.remaining() != keySize) { @@ -178,7 +190,9 @@ public class BufferGrouper implements Grouper } if (bucket < 0) { - return false; + // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will + // be correct. + return HASHTABLE_FULL; } } @@ -203,11 +217,11 @@ public class BufferGrouper implements Grouper aggregators[i].aggregate(tableBuffer, offset + aggregatorOffsets[i]); } - return true; + return AggregateResult.ok(); } @Override - public boolean aggregate(final KeyType key) + public AggregateResult aggregate(final KeyType key) { return aggregate(key, Groupers.hash(key)); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 27b1016d0a6..aa4ee9a73c7 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -145,7 +145,7 @@ public class ConcurrentGrouper implements Grouper } @Override - public boolean aggregate(KeyType key, int keyHash) + public AggregateResult aggregate(KeyType key, int keyHash) { if (!initialized) { throw new ISE("Grouper is not initialized"); @@ -160,8 +160,8 @@ public class ConcurrentGrouper implements Grouper synchronized (hashBasedGrouper) { if (!spilling) { - if (hashBasedGrouper.aggregate(key, keyHash)) { - return true; + if (hashBasedGrouper.aggregate(key, keyHash).isOk()) { + return AggregateResult.ok(); } else { spilling = true; } @@ -179,7 +179,7 @@ public class ConcurrentGrouper implements Grouper } @Override - public boolean aggregate(KeyType key) + public AggregateResult aggregate(KeyType key) { return aggregate(key, Groupers.hash(key)); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 2bdb9d35443..fb0fe1327b9 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -52,10 +52,10 @@ import io.druid.query.QueryContextKeys; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; +import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; -import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; import java.io.Closeable; @@ -107,7 +107,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner } @Override - public Sequence run(final Query queryParam, final Map responseContext) + public Sequence run(final Query queryParam, final Map responseContext) { final GroupByQuery query = (GroupByQuery) queryParam; final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); @@ -180,7 +180,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner throw new QueryInterruptedException(e); } - Pair, Accumulator, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( + Pair, Accumulator> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( query, false, null, @@ -192,21 +192,21 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner combiningAggregatorFactories ); final Grouper grouper = pair.lhs; - final Accumulator, Row> accumulator = pair.rhs; + final Accumulator accumulator = pair.rhs; grouper.init(); final ReferenceCountingResourceHolder> grouperHolder = ReferenceCountingResourceHolder.fromCloseable(grouper); resources.add(grouperHolder); - ListenableFuture> futures = Futures.allAsList( + ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( Iterables.transform( queryables, - new Function, ListenableFuture>() + new Function, ListenableFuture>() { @Override - public ListenableFuture apply(final QueryRunner input) + public ListenableFuture apply(final QueryRunner input) { if (input == null) { throw new ISE( @@ -214,21 +214,24 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner ); } - ListenableFuture future = exec.submit( - new AbstractPrioritizedCallable(priority) + ListenableFuture future = exec.submit( + new AbstractPrioritizedCallable(priority) { @Override - public Boolean call() throws Exception + public AggregateResult call() throws Exception { try ( Releaser bufferReleaser = mergeBufferHolder.increment(); Releaser grouperReleaser = grouperHolder.increment() ) { - final Object retVal = input.run(queryForRunners, responseContext) - .accumulate(grouper, accumulator); + final AggregateResult retVal = input.run(queryForRunners, responseContext) + .accumulate( + AggregateResult.ok(), + accumulator + ); // Return true if OK, false if resources were exhausted. - return retVal == grouper; + return retVal; } catch (QueryInterruptedException e) { throw e; @@ -295,7 +298,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private void waitForFutureCompletion( GroupByQuery query, - ListenableFuture> future, + ListenableFuture> future, long timeout ) { @@ -308,12 +311,12 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner throw new TimeoutException(); } - final List results = future.get(timeout, TimeUnit.MILLISECONDS); + final List results = future.get(timeout, TimeUnit.MILLISECONDS); - for (Boolean result : results) { - if (!result) { + for (AggregateResult result : results) { + if (!result.isOk()) { future.cancel(true); - throw GroupByQueryHelper.throwAccumulationResourceLimitExceededException(); + throw new ResourceLimitExceededException(result.getReason()); } } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index d7f99ab2327..a0f82b99630 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -299,7 +299,7 @@ outer: // Aggregate additional grouping for this row if (doAggregate) { keyBuffer.rewind(); - if (!grouper.aggregate(keyBuffer)) { + if (!grouper.aggregate(keyBuffer).isOk()) { // Buffer full while aggregating; break out and resume later currentRowWasPartiallyAggregated = true; break outer; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 0a4344585a6..97261aa0788 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -33,12 +33,12 @@ import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.FilteredSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.query.Query; +import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; -import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; import io.druid.query.groupby.resource.GroupByQueryResource; @@ -139,7 +139,7 @@ public class GroupByRowProcessor closeOnExit.add(temporaryStorage); - Pair, Accumulator, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( + Pair, Accumulator> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( query, true, rowSignature, @@ -160,15 +160,12 @@ public class GroupByRowProcessor aggregatorFactories ); final Grouper grouper = pair.lhs; - final Accumulator, Row> accumulator = pair.rhs; + final Accumulator accumulator = pair.rhs; closeOnExit.add(grouper); - final Grouper retVal = filteredSequence.accumulate( - grouper, - accumulator - ); - if (retVal != grouper) { - throw GroupByQueryHelper.throwAccumulationResourceLimitExceededException(); + final AggregateResult retVal = filteredSequence.accumulate(AggregateResult.ok(), accumulator); + if (!retVal.isOk()) { + throw new ResourceLimitExceededException(retVal.getReason()); } return RowBasedGrouperHelper.makeGrouperIterator( diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java index 2417e8961bc..1971c2b86b2 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java @@ -60,9 +60,9 @@ public interface Grouper extends Closeable * @param key key object * @param keyHash result of {@link Groupers#hash(Object)} on the key * - * @return true if the row was aggregated, false if not due to hitting resource limits + * @return result that is ok if the row was aggregated, not ok if a resource limit was hit */ - boolean aggregate(KeyType key, int keyHash); + AggregateResult aggregate(KeyType key, int keyHash); /** * Aggregate the current row with the provided key. Some implementations are thread-safe and @@ -70,9 +70,9 @@ public interface Grouper extends Closeable * * @param key key * - * @return true if the row was aggregated, false if not due to hitting resource limits + * @return result that is ok if the row was aggregated, not ok if a resource limit was hit */ - boolean aggregate(KeyType key); + AggregateResult aggregate(KeyType key); /** * Reset the grouper to its initial state. diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 623513ed8aa..9a79fd94cca 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -72,7 +72,7 @@ public class RowBasedGrouperHelper * been applied to the input rows yet, for example, in a nested query, if an extraction function is being * applied in the outer query to a field of the inner query. This method must apply those transformations. */ - public static Pair, Accumulator, Row>> createGrouperAccumulatorPair( + public static Pair, Accumulator> createGrouperAccumulatorPair( final GroupByQuery query, final boolean isInputRaw, final Map rawInputRowSignature, @@ -144,23 +144,23 @@ public class RowBasedGrouperHelper valueTypes ); - final Accumulator, Row> accumulator = new Accumulator, Row>() + final Accumulator accumulator = new Accumulator() { @Override - public Grouper accumulate( - final Grouper theGrouper, + public AggregateResult accumulate( + final AggregateResult priorResult, final Row row ) { BaseQuery.checkInterrupted(); - if (theGrouper == null) { - // Pass-through null returns without doing more work. - return null; + if (priorResult != null && !priorResult.isOk()) { + // Pass-through error returns without doing more work. + return priorResult; } - if (!theGrouper.isInitialized()) { - theGrouper.init(); + if (!grouper.isInitialized()) { + grouper.init(); } columnSelectorRow.set(row); @@ -168,14 +168,10 @@ public class RowBasedGrouperHelper final Comparable[] key = new Comparable[keySize]; valueExtractFn.apply(row, key); - final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(key)); - if (!didAggregate) { - // null return means grouping resources were exhausted. - return null; - } + final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key)); columnSelectorRow.set(null); - return theGrouper; + return aggregateResult; } }; @@ -346,7 +342,7 @@ public class RowBasedGrouperHelper Object dimVal = entry.getKey().getKey()[i]; theMap.put( query.getDimensions().get(i - dimStart).getOutputName(), - dimVal instanceof String ? Strings.emptyToNull((String)dimVal) : dimVal + dimVal instanceof String ? Strings.emptyToNull((String) dimVal) : dimVal ); } @@ -636,7 +632,7 @@ public class RowBasedGrouperHelper } return 0; - }; + } } private static class RowBasedKeySerde implements Grouper.KeySerde @@ -935,6 +931,7 @@ public class RowBasedGrouperHelper * * @param key RowBasedKey containing the grouping key values for a row. * @param idx Index of the grouping key column within that this SerdeHelper handles + * * @return true if the value was added to the key, false otherwise */ boolean putToKeyBuffer(RowBasedKey key, int idx); @@ -945,11 +942,11 @@ public class RowBasedGrouperHelper * * The value to be read resides in the buffer at position (`initialOffset` + the SerdeHelper's keyBufferPosition). * - * @param buffer ByteBuffer containing an array of grouping keys for a row + * @param buffer ByteBuffer containing an array of grouping keys for a row * @param initialOffset Offset where non-timestamp grouping key columns start, needed because timestamp is not * always included in the buffer. - * @param dimValIdx Index within dimValues to store the value read from the buffer - * @param dimValues Output array containing grouping key values for a row + * @param dimValIdx Index within dimValues to store the value read from the buffer + * @param dimValues Output array containing grouping key values for a row */ void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues); @@ -957,10 +954,11 @@ public class RowBasedGrouperHelper * Compare the values at lhsBuffer[lhsPosition] and rhsBuffer[rhsPosition] using the natural ordering * for this SerdeHelper's value type. * - * @param lhsBuffer ByteBuffer containing an array of grouping keys for a row - * @param rhsBuffer ByteBuffer containing an array of grouping keys for a row + * @param lhsBuffer ByteBuffer containing an array of grouping keys for a row + * @param rhsBuffer ByteBuffer containing an array of grouping keys for a row * @param lhsPosition Position of value within lhsBuffer * @param rhsPosition Position of value within rhsBuffer + * * @return Negative number if lhs < rhs, positive if lhs > rhs, 0 if lhs == rhs */ int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java index 47962d23c9f..21490173cba 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -51,6 +51,10 @@ import java.util.List; */ public class SpillingGrouper implements Grouper { + private static final AggregateResult DISK_FULL = AggregateResult.failure( + "Not enough disk space to execute this query. Try raising druid.query.groupBy.maxOnDiskStorage." + ); + private final BufferGrouper grouper; private final KeySerde keySerde; private final LimitedTemporaryStorage temporaryStorage; @@ -106,29 +110,31 @@ public class SpillingGrouper implements Grouper } @Override - public boolean aggregate(KeyType key, int keyHash) + public AggregateResult aggregate(KeyType key, int keyHash) { - if (grouper.aggregate(key, keyHash)) { - return true; - } else if (spillingAllowed) { + final AggregateResult result = grouper.aggregate(key, keyHash); + + if (result.isOk() || temporaryStorage.maxSize() <= 0 || !spillingAllowed) { + return result; + } else { // Warning: this can potentially block up a processing thread for a while. try { spill(); } catch (TemporaryStorageFullException e) { - return false; + return DISK_FULL; } catch (IOException e) { throw Throwables.propagate(e); } + + // Try again. return grouper.aggregate(key, keyHash); - } else { - return false; } } @Override - public boolean aggregate(KeyType key) + public AggregateResult aggregate(KeyType key) { return aggregate(key, Groupers.hash(key)); } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 07a4b876c67..9d88c8378c6 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -1263,7 +1263,112 @@ public class GroupByQueryRunnerTest List expectedResults = null; if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough resources to execute this query"); + expectedException.expectMessage("Not enough aggregation table space to execute this query"); + } else { + expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) + ); + } + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testNotEnoughDictionarySpaceThroughContextOverride() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setContext(ImmutableMap.of("maxOnDiskStorage", 0, "maxMergingDictionarySize", 1)) + .build(); + + List expectedResults = null; + if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + expectedException.expect(ResourceLimitExceededException.class); + expectedException.expectMessage("Not enough dictionary space to execute this query"); + } else { + expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) + ); + } + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testNotEnoughDiskSpaceThroughContextOverride() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setContext(ImmutableMap.of("maxOnDiskStorage", 1, "maxMergingDictionarySize", 1)) + .build(); + + List expectedResults = null; + if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + expectedException.expect(ResourceLimitExceededException.class); + if (config.getMaxOnDiskStorage() > 0) { + // The error message always mentions disk if you have spilling enabled (maxOnDiskStorage > 0) + expectedException.expectMessage("Not enough disk space to execute this query"); + } else { + expectedException.expectMessage("Not enough dictionary space to execute this query"); + } } else { expectedResults = Arrays.asList( GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), @@ -1336,7 +1441,7 @@ public class GroupByQueryRunnerTest GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } else { expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough resources to execute this query"); + expectedException.expectMessage("Not enough aggregation table space to execute this query"); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } } diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java index 97e9ae3e658..e6605400cec 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java @@ -97,16 +97,16 @@ public class BufferGrouperTest columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk()); } - Assert.assertFalse(grouper.aggregate(expectedMaxSize)); + Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk()); // Aggregate slightly different row columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L))); for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk()); } - Assert.assertFalse(grouper.aggregate(expectedMaxSize)); + Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk()); final List> expected = Lists.newArrayList(); for (int i = 0; i < expectedMaxSize; i++) { @@ -125,16 +125,16 @@ public class BufferGrouperTest columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk()); } - Assert.assertFalse(grouper.aggregate(expectedMaxSize)); + Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk()); // Aggregate slightly different row columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L))); for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk()); } - Assert.assertFalse(grouper.aggregate(expectedMaxSize)); + Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk()); final List> expected = Lists.newArrayList(); for (int i = 0; i < expectedMaxSize; i++) {