Better groupBy error messages and docs around resource limits. (#4162)

* Better groupBy error messages and docs around resource limits.

* Fix BufferGrouper test from datasketches.

* Further clarify.
This commit is contained in:
Gian Merlino 2017-04-14 02:38:53 +09:00 committed by Fangjin Yang
parent 2e9589215e
commit b2954d5fea
14 changed files with 327 additions and 99 deletions

View File

@ -141,8 +141,11 @@ on-heap by default, but it can optionally store aggregated values off-heap.
Query API and results are compatible between the two engines; however, there are some differences from a cluster Query API and results are compatible between the two engines; however, there are some differences from a cluster
configuration perspective: configuration perspective:
- groupBy v1 merges results in heap, whereas groupBy v2 merges results off-heap. As a result, optimal configuration for - groupBy v1 controls resource usage using a row-based limit (maxResults) whereas groupBy v2 uses bytes-based limits.
your Druid nodes may involve less heap (-Xmx, -Xms) and more direct memory (-XX:MaxDirectMemorySize). In addition, groupBy v1 merges results on-heap, whereas groupBy v2 merges results off-heap. These factors mean that
memory tuning and resource limits behave differently between v1 and v2. In particular, due to this, some queries
that can complete successfully in one engine may exceed resource limits and fail with the other engine. See the
"Memory tuning and resource limits" section for more details.
- groupBy v1 imposes no limit on the number of concurrently running queries, whereas groupBy v2 controls memory usage - groupBy v1 imposes no limit on the number of concurrently running queries, whereas groupBy v2 controls memory usage
by using a finite-sized merge buffer pool. By default, the number of merge buffers is 1/4 the number of processing by using a finite-sized merge buffer pool. By default, the number of merge buffers is 1/4 the number of processing
threads. You can adjust this as necessary to balance concurrency and memory usage. threads. You can adjust this as necessary to balance concurrency and memory usage.
@ -151,6 +154,37 @@ historical nodes.
- groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the broker, whereas groupBy v2 - groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the broker, whereas groupBy v2
ignores chunkPeriod. ignores chunkPeriod.
#### Memory tuning and resource limits
When using groupBy v2, three parameters control resource usage and limits:
- druid.processing.buffer.sizeBytes: size of the off-heap hash table used for aggregation, per query, in bytes. At
most druid.processing.numMergeBuffers of these will be created at once, which also serves as an upper limit on the
number of concurrently running groupBy queries.
- druid.query.groupBy.maxMergingDictionarySize: size of the on-heap dictionary used when grouping on strings, per query,
in bytes. Note that this is based on a rough estimate of the dictionary size, not the actual size.
- druid.query.groupBy.maxOnDiskStorage: amount of space on disk used for aggregation, per query, in bytes. By default,
this is 0, which means aggregation will not use disk.
If maxOnDiskStorage is 0 (the default) then a query that exceeds either the on-heap dictionary limit, or the off-heap
aggregation table limit, will fail with a "Resource limit exceeded" error describing the limit that was exceeded.
If maxOnDiskStorage is greater than 0, queries that exceed the in-memory limits will start using disk for aggregation.
In this case, when either the on-heap dictionary or off-heap hash table fills up, partially aggregated records will be
sorted and flushed to disk. Then, both in-memory structures will be cleared out for further aggregation. Queries that
then go on to exceed maxOnDiskStorage will fail with a "Resource limit exceeded" error indicating that they ran out of
disk space.
With groupBy v2, cluster operators should make sure that the off-heap hash tables and on-heap merging dictionaries
will not exceed available memory for the maximum possible concurrent query load (given by
druid.processing.numMergeBuffers).
When using groupBy v1, all aggregation is done on-heap, and resource limits are done through the parameter
druid.query.groupBy.maxResults. This is a cap on the maximum number of results in a result set. Queries that exceed
this limit will fail with a "Resource limit exceeded" error indicating they exceeded their row limit. Cluster
operators should make sure that the on-heap aggregations will not exceed available JVM heap space for the expected
concurrent query load.
#### Alternatives #### Alternatives
There are some situations where other query types may be a better choice than groupBy. There are some situations where other query types may be a better choice than groupBy.

View File

@ -75,14 +75,14 @@ public class BufferGrouperUsingSketchMergeAggregatorFactoryTest
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder))); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder)));
for (int i = 0; i < expectedMaxSize; i++) { for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
} }
updateSketch.update(3); updateSketch.update(3);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder))); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder)));
for (int i = 0; i < expectedMaxSize; i++) { for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
} }
Object[] holders = Lists.newArrayList(grouper.iterator(true)).get(0).getValues(); Object[] holders = Lists.newArrayList(grouper.iterator(true)).get(0).getValues();

View File

@ -256,18 +256,4 @@ public class GroupByQueryHelper
// Don't include post-aggregators since we don't know what types they are. // Don't include post-aggregators since we don't know what types they are.
return types.build(); return types.build();
} }
/**
* Throw a {@link ResourceLimitExceededException}. Only used by groupBy v2 when accumulation resources
* are exceeded, triggered by false return from {@link io.druid.query.groupby.epinephelinae.Grouper#aggregate(Object)}.
*
* @return nothing will ever be returned; this return type is for your convenience, similar to
* Throwables.propagate in Guava.
*/
public static ResourceLimitExceededException throwAccumulationResourceLimitExceededException()
{
throw new ResourceLimitExceededException(
"Not enough resources to execute this query. Try increasing druid.query.groupBy.maxOnDiskStorage, "
+ "druid.query.groupBy.maxMergingDictionarySize, or druid.processing.buffer.sizeBytes.");
}
} }

View File

@ -0,0 +1,85 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import java.util.Objects;
public class AggregateResult
{
private static final AggregateResult OK = new AggregateResult(true, null);
private final boolean ok;
private final String reason;
public static AggregateResult ok()
{
return OK;
}
public static AggregateResult failure(final String reason)
{
return new AggregateResult(false, reason);
}
private AggregateResult(final boolean ok, final String reason)
{
this.ok = ok;
this.reason = reason;
}
public boolean isOk()
{
return ok;
}
public String getReason()
{
return reason;
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AggregateResult that = (AggregateResult) o;
return ok == that.ok &&
Objects.equals(reason, that.reason);
}
@Override
public int hashCode()
{
return Objects.hash(ok, reason);
}
@Override
public String toString()
{
return "AggregateResult{" +
"ok=" + ok +
", reason='" + reason + '\'' +
'}';
}
}

View File

@ -59,6 +59,16 @@ import java.util.List;
public class BufferGrouper<KeyType> implements Grouper<KeyType> public class BufferGrouper<KeyType> implements Grouper<KeyType>
{ {
private static final Logger log = new Logger(BufferGrouper.class); private static final Logger log = new Logger(BufferGrouper.class);
private static final AggregateResult DICTIONARY_FULL = AggregateResult.failure(
"Not enough dictionary space to execute this query. Try increasing "
+ "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
private static final AggregateResult HASHTABLE_FULL = AggregateResult.failure(
"Not enough aggregation table space to execute this query. Try increasing "
+ "druid.processing.buffer.sizeBytes or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
private static final int MIN_INITIAL_BUCKETS = 4; private static final int MIN_INITIAL_BUCKETS = 4;
private static final int DEFAULT_INITIAL_BUCKETS = 1024; private static final int DEFAULT_INITIAL_BUCKETS = 1024;
@ -146,11 +156,13 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
} }
@Override @Override
public boolean aggregate(KeyType key, int keyHash) public AggregateResult aggregate(KeyType key, int keyHash)
{ {
final ByteBuffer keyBuffer = keySerde.toByteBuffer(key); final ByteBuffer keyBuffer = keySerde.toByteBuffer(key);
if (keyBuffer == null) { if (keyBuffer == null) {
return false; // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct.
return DICTIONARY_FULL;
} }
if (keyBuffer.remaining() != keySize) { if (keyBuffer.remaining() != keySize) {
@ -178,7 +190,9 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
} }
if (bucket < 0) { if (bucket < 0) {
return false; // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct.
return HASHTABLE_FULL;
} }
} }
@ -203,11 +217,11 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
aggregators[i].aggregate(tableBuffer, offset + aggregatorOffsets[i]); aggregators[i].aggregate(tableBuffer, offset + aggregatorOffsets[i]);
} }
return true; return AggregateResult.ok();
} }
@Override @Override
public boolean aggregate(final KeyType key) public AggregateResult aggregate(final KeyType key)
{ {
return aggregate(key, Groupers.hash(key)); return aggregate(key, Groupers.hash(key));
} }

View File

@ -145,7 +145,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
} }
@Override @Override
public boolean aggregate(KeyType key, int keyHash) public AggregateResult aggregate(KeyType key, int keyHash)
{ {
if (!initialized) { if (!initialized) {
throw new ISE("Grouper is not initialized"); throw new ISE("Grouper is not initialized");
@ -160,8 +160,8 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
synchronized (hashBasedGrouper) { synchronized (hashBasedGrouper) {
if (!spilling) { if (!spilling) {
if (hashBasedGrouper.aggregate(key, keyHash)) { if (hashBasedGrouper.aggregate(key, keyHash).isOk()) {
return true; return AggregateResult.ok();
} else { } else {
spilling = true; spilling = true;
} }
@ -179,7 +179,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
} }
@Override @Override
public boolean aggregate(KeyType key) public AggregateResult aggregate(KeyType key)
{ {
return aggregate(key, Groupers.hash(key)); return aggregate(key, Groupers.hash(key));
} }

View File

@ -52,10 +52,10 @@ import io.druid.query.QueryContextKeys;
import io.druid.query.QueryInterruptedException; import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher; import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
import java.io.Closeable; import java.io.Closeable;
@ -107,7 +107,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
} }
@Override @Override
public Sequence<Row> run(final Query queryParam, final Map responseContext) public Sequence<Row> run(final Query<Row> queryParam, final Map<String, Object> responseContext)
{ {
final GroupByQuery query = (GroupByQuery) queryParam; final GroupByQuery query = (GroupByQuery) queryParam;
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
@ -180,7 +180,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
throw new QueryInterruptedException(e); throw new QueryInterruptedException(e);
} }
Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query, query,
false, false,
null, null,
@ -192,21 +192,21 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
combiningAggregatorFactories combiningAggregatorFactories
); );
final Grouper<RowBasedKey> grouper = pair.lhs; final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs; final Accumulator<AggregateResult, Row> accumulator = pair.rhs;
grouper.init(); grouper.init();
final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder = final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
ReferenceCountingResourceHolder.fromCloseable(grouper); ReferenceCountingResourceHolder.fromCloseable(grouper);
resources.add(grouperHolder); resources.add(grouperHolder);
ListenableFuture<List<Boolean>> futures = Futures.allAsList( ListenableFuture<List<AggregateResult>> futures = Futures.allAsList(
Lists.newArrayList( Lists.newArrayList(
Iterables.transform( Iterables.transform(
queryables, queryables,
new Function<QueryRunner<Row>, ListenableFuture<Boolean>>() new Function<QueryRunner<Row>, ListenableFuture<AggregateResult>>()
{ {
@Override @Override
public ListenableFuture<Boolean> apply(final QueryRunner<Row> input) public ListenableFuture<AggregateResult> apply(final QueryRunner<Row> input)
{ {
if (input == null) { if (input == null) {
throw new ISE( throw new ISE(
@ -214,21 +214,24 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
); );
} }
ListenableFuture<Boolean> future = exec.submit( ListenableFuture<AggregateResult> future = exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority) new AbstractPrioritizedCallable<AggregateResult>(priority)
{ {
@Override @Override
public Boolean call() throws Exception public AggregateResult call() throws Exception
{ {
try ( try (
Releaser bufferReleaser = mergeBufferHolder.increment(); Releaser bufferReleaser = mergeBufferHolder.increment();
Releaser grouperReleaser = grouperHolder.increment() Releaser grouperReleaser = grouperHolder.increment()
) { ) {
final Object retVal = input.run(queryForRunners, responseContext) final AggregateResult retVal = input.run(queryForRunners, responseContext)
.accumulate(grouper, accumulator); .accumulate(
AggregateResult.ok(),
accumulator
);
// Return true if OK, false if resources were exhausted. // Return true if OK, false if resources were exhausted.
return retVal == grouper; return retVal;
} }
catch (QueryInterruptedException e) { catch (QueryInterruptedException e) {
throw e; throw e;
@ -295,7 +298,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
private void waitForFutureCompletion( private void waitForFutureCompletion(
GroupByQuery query, GroupByQuery query,
ListenableFuture<List<Boolean>> future, ListenableFuture<List<AggregateResult>> future,
long timeout long timeout
) )
{ {
@ -308,12 +311,12 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
throw new TimeoutException(); throw new TimeoutException();
} }
final List<Boolean> results = future.get(timeout, TimeUnit.MILLISECONDS); final List<AggregateResult> results = future.get(timeout, TimeUnit.MILLISECONDS);
for (Boolean result : results) { for (AggregateResult result : results) {
if (!result) { if (!result.isOk()) {
future.cancel(true); future.cancel(true);
throw GroupByQueryHelper.throwAccumulationResourceLimitExceededException(); throw new ResourceLimitExceededException(result.getReason());
} }
} }
} }

View File

@ -299,7 +299,7 @@ outer:
// Aggregate additional grouping for this row // Aggregate additional grouping for this row
if (doAggregate) { if (doAggregate) {
keyBuffer.rewind(); keyBuffer.rewind();
if (!grouper.aggregate(keyBuffer)) { if (!grouper.aggregate(keyBuffer).isOk()) {
// Buffer full while aggregating; break out and resume later // Buffer full while aggregating; break out and resume later
currentRowWasPartiallyAggregated = true; currentRowWasPartiallyAggregated = true;
break outer; break outer;

View File

@ -33,12 +33,12 @@ import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.FilteredSequence; import io.druid.java.util.common.guava.FilteredSequence;
import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequence;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.query.groupby.RowBasedColumnSelectorFactory;
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.query.groupby.resource.GroupByQueryResource;
@ -139,7 +139,7 @@ public class GroupByRowProcessor
closeOnExit.add(temporaryStorage); closeOnExit.add(temporaryStorage);
Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query, query,
true, true,
rowSignature, rowSignature,
@ -160,15 +160,12 @@ public class GroupByRowProcessor
aggregatorFactories aggregatorFactories
); );
final Grouper<RowBasedKey> grouper = pair.lhs; final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs; final Accumulator<AggregateResult, Row> accumulator = pair.rhs;
closeOnExit.add(grouper); closeOnExit.add(grouper);
final Grouper<RowBasedKey> retVal = filteredSequence.accumulate( final AggregateResult retVal = filteredSequence.accumulate(AggregateResult.ok(), accumulator);
grouper, if (!retVal.isOk()) {
accumulator throw new ResourceLimitExceededException(retVal.getReason());
);
if (retVal != grouper) {
throw GroupByQueryHelper.throwAccumulationResourceLimitExceededException();
} }
return RowBasedGrouperHelper.makeGrouperIterator( return RowBasedGrouperHelper.makeGrouperIterator(

View File

@ -60,9 +60,9 @@ public interface Grouper<KeyType> extends Closeable
* @param key key object * @param key key object
* @param keyHash result of {@link Groupers#hash(Object)} on the key * @param keyHash result of {@link Groupers#hash(Object)} on the key
* *
* @return true if the row was aggregated, false if not due to hitting resource limits * @return result that is ok if the row was aggregated, not ok if a resource limit was hit
*/ */
boolean aggregate(KeyType key, int keyHash); AggregateResult aggregate(KeyType key, int keyHash);
/** /**
* Aggregate the current row with the provided key. Some implementations are thread-safe and * Aggregate the current row with the provided key. Some implementations are thread-safe and
@ -70,9 +70,9 @@ public interface Grouper<KeyType> extends Closeable
* *
* @param key key * @param key key
* *
* @return true if the row was aggregated, false if not due to hitting resource limits * @return result that is ok if the row was aggregated, not ok if a resource limit was hit
*/ */
boolean aggregate(KeyType key); AggregateResult aggregate(KeyType key);
/** /**
* Reset the grouper to its initial state. * Reset the grouper to its initial state.

View File

@ -72,7 +72,7 @@ public class RowBasedGrouperHelper
* been applied to the input rows yet, for example, in a nested query, if an extraction function is being * been applied to the input rows yet, for example, in a nested query, if an extraction function is being
* applied in the outer query to a field of the inner query. This method must apply those transformations. * applied in the outer query to a field of the inner query. This method must apply those transformations.
*/ */
public static Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> createGrouperAccumulatorPair( public static Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> createGrouperAccumulatorPair(
final GroupByQuery query, final GroupByQuery query,
final boolean isInputRaw, final boolean isInputRaw,
final Map<String, ValueType> rawInputRowSignature, final Map<String, ValueType> rawInputRowSignature,
@ -144,23 +144,23 @@ public class RowBasedGrouperHelper
valueTypes valueTypes
); );
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = new Accumulator<Grouper<RowBasedKey>, Row>() final Accumulator<AggregateResult, Row> accumulator = new Accumulator<AggregateResult, Row>()
{ {
@Override @Override
public Grouper<RowBasedKey> accumulate( public AggregateResult accumulate(
final Grouper<RowBasedKey> theGrouper, final AggregateResult priorResult,
final Row row final Row row
) )
{ {
BaseQuery.checkInterrupted(); BaseQuery.checkInterrupted();
if (theGrouper == null) { if (priorResult != null && !priorResult.isOk()) {
// Pass-through null returns without doing more work. // Pass-through error returns without doing more work.
return null; return priorResult;
} }
if (!theGrouper.isInitialized()) { if (!grouper.isInitialized()) {
theGrouper.init(); grouper.init();
} }
columnSelectorRow.set(row); columnSelectorRow.set(row);
@ -168,14 +168,10 @@ public class RowBasedGrouperHelper
final Comparable[] key = new Comparable[keySize]; final Comparable[] key = new Comparable[keySize];
valueExtractFn.apply(row, key); valueExtractFn.apply(row, key);
final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(key)); final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key));
if (!didAggregate) {
// null return means grouping resources were exhausted.
return null;
}
columnSelectorRow.set(null); columnSelectorRow.set(null);
return theGrouper; return aggregateResult;
} }
}; };
@ -346,7 +342,7 @@ public class RowBasedGrouperHelper
Object dimVal = entry.getKey().getKey()[i]; Object dimVal = entry.getKey().getKey()[i];
theMap.put( theMap.put(
query.getDimensions().get(i - dimStart).getOutputName(), query.getDimensions().get(i - dimStart).getOutputName(),
dimVal instanceof String ? Strings.emptyToNull((String)dimVal) : dimVal dimVal instanceof String ? Strings.emptyToNull((String) dimVal) : dimVal
); );
} }
@ -636,7 +632,7 @@ public class RowBasedGrouperHelper
} }
return 0; return 0;
}; }
} }
private static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedKey> private static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedKey>
@ -935,6 +931,7 @@ public class RowBasedGrouperHelper
* *
* @param key RowBasedKey containing the grouping key values for a row. * @param key RowBasedKey containing the grouping key values for a row.
* @param idx Index of the grouping key column within that this SerdeHelper handles * @param idx Index of the grouping key column within that this SerdeHelper handles
*
* @return true if the value was added to the key, false otherwise * @return true if the value was added to the key, false otherwise
*/ */
boolean putToKeyBuffer(RowBasedKey key, int idx); boolean putToKeyBuffer(RowBasedKey key, int idx);
@ -945,11 +942,11 @@ public class RowBasedGrouperHelper
* *
* The value to be read resides in the buffer at position (`initialOffset` + the SerdeHelper's keyBufferPosition). * The value to be read resides in the buffer at position (`initialOffset` + the SerdeHelper's keyBufferPosition).
* *
* @param buffer ByteBuffer containing an array of grouping keys for a row * @param buffer ByteBuffer containing an array of grouping keys for a row
* @param initialOffset Offset where non-timestamp grouping key columns start, needed because timestamp is not * @param initialOffset Offset where non-timestamp grouping key columns start, needed because timestamp is not
* always included in the buffer. * always included in the buffer.
* @param dimValIdx Index within dimValues to store the value read from the buffer * @param dimValIdx Index within dimValues to store the value read from the buffer
* @param dimValues Output array containing grouping key values for a row * @param dimValues Output array containing grouping key values for a row
*/ */
void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues); void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues);
@ -957,10 +954,11 @@ public class RowBasedGrouperHelper
* Compare the values at lhsBuffer[lhsPosition] and rhsBuffer[rhsPosition] using the natural ordering * Compare the values at lhsBuffer[lhsPosition] and rhsBuffer[rhsPosition] using the natural ordering
* for this SerdeHelper's value type. * for this SerdeHelper's value type.
* *
* @param lhsBuffer ByteBuffer containing an array of grouping keys for a row * @param lhsBuffer ByteBuffer containing an array of grouping keys for a row
* @param rhsBuffer ByteBuffer containing an array of grouping keys for a row * @param rhsBuffer ByteBuffer containing an array of grouping keys for a row
* @param lhsPosition Position of value within lhsBuffer * @param lhsPosition Position of value within lhsBuffer
* @param rhsPosition Position of value within rhsBuffer * @param rhsPosition Position of value within rhsBuffer
*
* @return Negative number if lhs < rhs, positive if lhs > rhs, 0 if lhs == rhs * @return Negative number if lhs < rhs, positive if lhs > rhs, 0 if lhs == rhs
*/ */
int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition); int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition);

View File

@ -51,6 +51,10 @@ import java.util.List;
*/ */
public class SpillingGrouper<KeyType> implements Grouper<KeyType> public class SpillingGrouper<KeyType> implements Grouper<KeyType>
{ {
private static final AggregateResult DISK_FULL = AggregateResult.failure(
"Not enough disk space to execute this query. Try raising druid.query.groupBy.maxOnDiskStorage."
);
private final BufferGrouper<KeyType> grouper; private final BufferGrouper<KeyType> grouper;
private final KeySerde<KeyType> keySerde; private final KeySerde<KeyType> keySerde;
private final LimitedTemporaryStorage temporaryStorage; private final LimitedTemporaryStorage temporaryStorage;
@ -106,29 +110,31 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
} }
@Override @Override
public boolean aggregate(KeyType key, int keyHash) public AggregateResult aggregate(KeyType key, int keyHash)
{ {
if (grouper.aggregate(key, keyHash)) { final AggregateResult result = grouper.aggregate(key, keyHash);
return true;
} else if (spillingAllowed) { if (result.isOk() || temporaryStorage.maxSize() <= 0 || !spillingAllowed) {
return result;
} else {
// Warning: this can potentially block up a processing thread for a while. // Warning: this can potentially block up a processing thread for a while.
try { try {
spill(); spill();
} }
catch (TemporaryStorageFullException e) { catch (TemporaryStorageFullException e) {
return false; return DISK_FULL;
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
// Try again.
return grouper.aggregate(key, keyHash); return grouper.aggregate(key, keyHash);
} else {
return false;
} }
} }
@Override @Override
public boolean aggregate(KeyType key) public AggregateResult aggregate(KeyType key)
{ {
return aggregate(key, Groupers.hash(key)); return aggregate(key, Groupers.hash(key));
} }

View File

@ -1263,7 +1263,112 @@ public class GroupByQueryRunnerTest
List<Row> expectedResults = null; List<Row> expectedResults = null;
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(ResourceLimitExceededException.class); expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Not enough resources to execute this query"); expectedException.expectMessage("Not enough aggregation table space to execute this query");
} else {
expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
}
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testNotEnoughDictionarySpaceThroughContextOverride()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setContext(ImmutableMap.<String, Object>of("maxOnDiskStorage", 0, "maxMergingDictionarySize", 1))
.build();
List<Row> expectedResults = null;
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Not enough dictionary space to execute this query");
} else {
expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
}
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testNotEnoughDiskSpaceThroughContextOverride()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setContext(ImmutableMap.<String, Object>of("maxOnDiskStorage", 1, "maxMergingDictionarySize", 1))
.build();
List<Row> expectedResults = null;
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(ResourceLimitExceededException.class);
if (config.getMaxOnDiskStorage() > 0) {
// The error message always mentions disk if you have spilling enabled (maxOnDiskStorage > 0)
expectedException.expectMessage("Not enough disk space to execute this query");
} else {
expectedException.expectMessage("Not enough dictionary space to execute this query");
}
} else { } else {
expectedResults = Arrays.asList( expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
@ -1336,7 +1441,7 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else { } else {
expectedException.expect(ResourceLimitExceededException.class); expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Not enough resources to execute this query"); expectedException.expectMessage("Not enough aggregation table space to execute this query");
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} }
} }

View File

@ -97,16 +97,16 @@ public class BufferGrouperTest
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L))); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) { for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
} }
Assert.assertFalse(grouper.aggregate(expectedMaxSize)); Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk());
// Aggregate slightly different row // Aggregate slightly different row
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 11L))); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 11L)));
for (int i = 0; i < expectedMaxSize; i++) { for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
} }
Assert.assertFalse(grouper.aggregate(expectedMaxSize)); Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk());
final List<Grouper.Entry<Integer>> expected = Lists.newArrayList(); final List<Grouper.Entry<Integer>> expected = Lists.newArrayList();
for (int i = 0; i < expectedMaxSize; i++) { for (int i = 0; i < expectedMaxSize; i++) {
@ -125,16 +125,16 @@ public class BufferGrouperTest
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L))); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) { for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
} }
Assert.assertFalse(grouper.aggregate(expectedMaxSize)); Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk());
// Aggregate slightly different row // Aggregate slightly different row
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 11L))); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 11L)));
for (int i = 0; i < expectedMaxSize; i++) { for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
} }
Assert.assertFalse(grouper.aggregate(expectedMaxSize)); Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk());
final List<Grouper.Entry<Integer>> expected = Lists.newArrayList(); final List<Grouper.Entry<Integer>> expected = Lists.newArrayList();
for (int i = 0; i < expectedMaxSize; i++) { for (int i = 0; i < expectedMaxSize; i++) {