mirror of https://github.com/apache/druid.git
Merge pull request #2221 from binlijin/topN_minTopNThreshold
Allow change minTopNThreshold per topN query
This commit is contained in:
commit
d7ad93debc
|
@ -7,14 +7,14 @@ Query Context
|
|||
|
||||
The query context is used for various query configuration parameters.
|
||||
|
||||
|property |default | description |
|
||||
|--------------|---------------------|----------------------|
|
||||
|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. |
|
||||
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|
||||
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|
||||
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration |
|
||||
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|
||||
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|
||||
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|
||||
|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. |
|
||||
|
||||
|property |default | description |
|
||||
|-----------------|---------------------|----------------------|
|
||||
|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. |
|
||||
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|
||||
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|
||||
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration |
|
||||
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|
||||
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|
||||
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|
||||
|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. |
|
||||
|minTopNThreshold | `1000` | The top minTopNThreshold local results from each segment are returned for merging to determine the global topN. |
|
||||
|
|
|
@ -131,7 +131,7 @@ The format of the results would look like so:
|
|||
### Aliasing
|
||||
The current TopN algorithm is an approximate algorithm. The top 1000 local results from each segment are returned for merging to determine the global topN. As such, the topN algorithm is approximate in both rank and results. Approximate results *ONLY APPLY WHEN THERE ARE MORE THAN 1000 DIM VALUES*. A topN over a dimension with fewer than 1000 unique dimension values can be considered accurate in rank and accurate in aggregates.
|
||||
|
||||
The threshold can be modified from it's default 1000 via the server parameter `druid.query.topN.minTopNThreshold`
|
||||
The threshold can be modified from it's default 1000 via the server parameter `druid.query.topN.minTopNThreshold` which need to restart servers to take effect or set `minTopNThreshold` in query context which take effect per query.
|
||||
|
||||
If you are wanting the top 100 of a high cardinality, uniformly distributed dimension ordered by some low-cardinality, uniformly distributed dimension, you are potentially going to get aggregates back that are missing data.
|
||||
|
||||
|
|
|
@ -470,7 +470,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
{
|
||||
final ThresholdAdjustingQueryRunner thresholdRunner = new ThresholdAdjustingQueryRunner(
|
||||
runner,
|
||||
config.getMinTopNThreshold()
|
||||
config
|
||||
);
|
||||
return new QueryRunner<Result<TopNResultValue>>()
|
||||
{
|
||||
|
@ -532,18 +532,18 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
return Ordering.natural();
|
||||
}
|
||||
|
||||
private static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
|
||||
static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
|
||||
{
|
||||
private final QueryRunner<Result<TopNResultValue>> runner;
|
||||
private final int minTopNThreshold;
|
||||
private final TopNQueryConfig config;
|
||||
|
||||
public ThresholdAdjustingQueryRunner(
|
||||
QueryRunner<Result<TopNResultValue>> runner,
|
||||
int minTopNThreshold
|
||||
TopNQueryConfig config
|
||||
)
|
||||
{
|
||||
this.runner = runner;
|
||||
this.minTopNThreshold = minTopNThreshold;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -557,6 +557,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
}
|
||||
|
||||
final TopNQuery query = (TopNQuery) input;
|
||||
final int minTopNThreshold = query.getContextValue("minTopNThreshold", config.getMinTopNThreshold());
|
||||
if (query.getThreshold() > minTopNThreshold) {
|
||||
return runner.run(query, responseContext);
|
||||
}
|
||||
|
|
|
@ -22,25 +22,37 @@ package io.druid.query.topn;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.CacheStrategy;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.TestQueryRunners;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
import io.druid.segment.TestIndex;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
public class TopNQueryQueryToolChestTest
|
||||
{
|
||||
|
||||
private static final String segmentId = "testSegment";
|
||||
|
||||
@Test
|
||||
public void testCacheStrategy() throws Exception
|
||||
{
|
||||
|
@ -93,4 +105,72 @@ public class TopNQueryQueryToolChestTest
|
|||
|
||||
Assert.assertEquals(result, fromCacheResult);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinTopNThreshold() throws Exception
|
||||
{
|
||||
TopNQueryConfig config = new TopNQueryConfig();
|
||||
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(
|
||||
config,
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
);
|
||||
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
|
||||
TestQueryRunners.getPool(),
|
||||
chest,
|
||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER
|
||||
);
|
||||
QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
|
||||
factory,
|
||||
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId)
|
||||
);
|
||||
|
||||
Map<String, Object> context = Maps.newHashMap();
|
||||
context.put("minTopNThreshold", 500);
|
||||
|
||||
TopNQueryBuilder builder = new TopNQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.dimension(QueryRunnerTestHelper.placementishDimension)
|
||||
.metric(QueryRunnerTestHelper.indexMetric)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.aggregators(QueryRunnerTestHelper.commonAggregators);
|
||||
|
||||
TopNQuery query1 = builder.threshold(10).context(null).build();
|
||||
MockQueryRunner mockRunner = new MockQueryRunner(runner);
|
||||
new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
|
||||
.run(query1, ImmutableMap.<String, Object>of());
|
||||
Assert.assertEquals(1000, mockRunner.query.getThreshold());
|
||||
|
||||
TopNQuery query2 = builder.threshold(10).context(context).build();
|
||||
|
||||
new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
|
||||
.run(query2, ImmutableMap.<String, Object>of());
|
||||
Assert.assertEquals(500, mockRunner.query.getThreshold());
|
||||
|
||||
TopNQuery query3 = builder.threshold(2000).context(context).build();
|
||||
new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
|
||||
.run(query3, ImmutableMap.<String, Object>of());
|
||||
Assert.assertEquals(2000, mockRunner.query.getThreshold());
|
||||
}
|
||||
|
||||
static class MockQueryRunner implements QueryRunner<Result<TopNResultValue>>
|
||||
{
|
||||
private final QueryRunner<Result<TopNResultValue>> runner;
|
||||
TopNQuery query = null;
|
||||
|
||||
MockQueryRunner(QueryRunner<Result<TopNResultValue>> runner)
|
||||
{
|
||||
this.runner = runner;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<TopNResultValue>> run(
|
||||
Query<Result<TopNResultValue>> query,
|
||||
Map<String, Object> responseContext
|
||||
)
|
||||
{
|
||||
this.query = (TopNQuery) query;
|
||||
return query.run(runner, responseContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue