diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 41b1a18ce30..83542e03bae 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -68,8 +68,8 @@ The broker uses processing configs for nested groupBy queries. And, optionally, |Property|Description|Default| |--------|-----------|-------| |`druid.query.groupBy.singleThreaded`|Run single threaded group By queries.|false| -|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows.|50000| -|`druid.query.groupBy.maxResults`|Maximum number of results.|500000| +|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows. This can be overriden at query time by `maxIntermediateRows` attribute in query context.|50000| +|`druid.query.groupBy.maxResults`|Maximum number of results. This can be overriden at query time by `maxResults` attribute in query context.|500000| ##### Search Query Config diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 57264a844ea..ce07a5494db 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -66,8 +66,8 @@ Druid uses Jetty to serve HTTP requests. |Property|Description|Default| |--------|-----------|-------| |`druid.query.groupBy.singleThreaded`|Run single threaded group By queries.|false| -|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows.|50000| -|`druid.query.groupBy.maxResults`|Maximum number of results.|500000| +|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows. This can be overriden at query time by `maxIntermediateRows` attribute in query context.|50000| +|`druid.query.groupBy.maxResults`|Maximum number of results. This can be overriden at query time by `maxResults` attribute in query context.|500000| ##### Search Query Config diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index 7b4707013c1..eff84069ee0 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -18,3 +18,6 @@ The query context is used for various query configuration parameters. |finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | |chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. | |minTopNThreshold | `1000` | The top minTopNThreshold local results from each segment are returned for merging to determine the global topN. | +|`maxResults`|500000|Maximum number of results groupBy query can process.| +|`maxIntermediateRows`|50000|Maximum number of intermediate rows while processing single segment for groupBy query.| + diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index afd2739b566..3674703c975 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -66,6 +66,8 @@ import java.util.NoSuchElementException; */ public class GroupByQueryEngine { + private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; + private final Supplier config; private final StupidPool intermediateResultsBufferPool; @@ -289,7 +291,7 @@ public class GroupByQueryEngine private final GroupByQuery query; private final Cursor cursor; private final ByteBuffer metricsBuffer; - private final GroupByQueryConfig config; + private final int maxIntermediateRows; private final List dimensionSpecs; private final List dimensions; @@ -307,7 +309,8 @@ public class GroupByQueryEngine this.query = query; this.cursor = cursor; this.metricsBuffer = metricsBuffer; - this.config = config; + + this.maxIntermediateRows = query.getContextValue(CTX_KEY_MAX_INTERMEDIATE_ROWS, config.getMaxIntermediateRows()); unprocessedKeys = null; delegate = Iterators.emptyIterator(); @@ -364,7 +367,7 @@ public class GroupByQueryEngine } cursor.advance(); } - while (!cursor.isDone() && rowUpdater.getNumRows() < config.getMaxIntermediateRows()) { + while (!cursor.isDone() && rowUpdater.getNumRows() < maxIntermediateRows) { ByteBuffer key = ByteBuffer.allocate(dimensions.size() * Ints.BYTES); unprocessedKeys = rowUpdater.updateValues(key, dimensions); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index f47f151903e..2ebd302442d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -42,6 +42,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; public class GroupByQueryHelper { + private static final String CTX_KEY_MAX_RESULTS = "maxResults"; + public static Pair> createIndexAccumulatorPair( final GroupByQuery query, final GroupByQueryConfig config, @@ -88,7 +90,7 @@ public class GroupByQueryHelper aggs.toArray(new AggregatorFactory[aggs.size()]), false, true, - config.getMaxResults(), + query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), bufferPool ); } else { @@ -100,7 +102,7 @@ public class GroupByQueryHelper aggs.toArray(new AggregatorFactory[aggs.size()]), false, true, - config.getMaxResults() + query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()) ); } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 9a4f2712d85..ac3fd8956e1 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.parsers.ParseException; @@ -255,6 +256,28 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, results, ""); } + + @Test(expected = ISE.class) + public void testGroupByMaxRowsLimitContextOverrid() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setContext(ImmutableMap.of("maxResults", 1)) + .build(); + + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } + @Test public void testGroupByWithRebucketRename() {