allow groupBy max[Intermediate]Rows limit be overridable by context

This commit is contained in:
Himanshu Gupta 2016-03-02 17:13:58 -06:00
parent 4fa08a1329
commit 099acb4966
6 changed files with 40 additions and 9 deletions

View File

@ -68,8 +68,8 @@ The broker uses processing configs for nested groupBy queries. And, optionally,
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.singleThreaded`|Run single threaded group By queries.|false|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results.|500000|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows. This can be overriden at query time by `maxIntermediateRows` attribute in query context.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results. This can be overriden at query time by `maxResults` attribute in query context.|500000|
##### Search Query Config

View File

@ -66,8 +66,8 @@ Druid uses Jetty to serve HTTP requests.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.singleThreaded`|Run single threaded group By queries.|false|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results.|500000|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows. This can be overriden at query time by `maxIntermediateRows` attribute in query context.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results. This can be overriden at query time by `maxResults` attribute in query context.|500000|
##### Search Query Config

View File

@ -18,3 +18,6 @@ The query context is used for various query configuration parameters.
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. |
|minTopNThreshold | `1000` | The top minTopNThreshold local results from each segment are returned for merging to determine the global topN. |
|`maxResults`|500000|Maximum number of results groupBy query can process.|
|`maxIntermediateRows`|50000|Maximum number of intermediate rows while processing single segment for groupBy query.|

View File

@ -66,6 +66,8 @@ import java.util.NoSuchElementException;
*/
public class GroupByQueryEngine
{
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private final Supplier<GroupByQueryConfig> config;
private final StupidPool<ByteBuffer> intermediateResultsBufferPool;
@ -289,7 +291,7 @@ public class GroupByQueryEngine
private final GroupByQuery query;
private final Cursor cursor;
private final ByteBuffer metricsBuffer;
private final GroupByQueryConfig config;
private final int maxIntermediateRows;
private final List<DimensionSpec> dimensionSpecs;
private final List<DimensionSelector> dimensions;
@ -307,7 +309,8 @@ public class GroupByQueryEngine
this.query = query;
this.cursor = cursor;
this.metricsBuffer = metricsBuffer;
this.config = config;
this.maxIntermediateRows = query.getContextValue(CTX_KEY_MAX_INTERMEDIATE_ROWS, config.getMaxIntermediateRows());
unprocessedKeys = null;
delegate = Iterators.emptyIterator();
@ -364,7 +367,7 @@ public class GroupByQueryEngine
}
cursor.advance();
}
while (!cursor.isDone() && rowUpdater.getNumRows() < config.getMaxIntermediateRows()) {
while (!cursor.isDone() && rowUpdater.getNumRows() < maxIntermediateRows) {
ByteBuffer key = ByteBuffer.allocate(dimensions.size() * Ints.BYTES);
unprocessedKeys = rowUpdater.updateValues(key, dimensions);

View File

@ -42,6 +42,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public class GroupByQueryHelper
{
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query,
final GroupByQueryConfig config,
@ -88,7 +90,7 @@ public class GroupByQueryHelper
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
config.getMaxResults(),
query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()),
bufferPool
);
} else {
@ -100,7 +102,7 @@ public class GroupByQueryHelper
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
config.getMaxResults()
query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults())
);
}

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.parsers.ParseException;
@ -255,6 +256,28 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test(expected = ISE.class)
public void testGroupByMaxRowsLimitContextOverrid()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setContext(ImmutableMap.<String, Object>of("maxResults", 1))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
@Test
public void testGroupByWithRebucketRename()
{