This commit is contained in:
binlijin 2016-01-07 18:01:46 +08:00
parent a6bfcc5bfd
commit 010c6e959c
2 changed files with 81 additions and 1 deletions

View File

@ -532,7 +532,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return Ordering.natural();
}
private static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;
private final TopNQueryConfig config;

View File

@ -22,25 +22,37 @@ package io.druid.query.topn;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.TestIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
public class TopNQueryQueryToolChestTest
{
private static final String segmentId = "testSegment";
@Test
public void testCacheStrategy() throws Exception
{
@ -93,4 +105,72 @@ public class TopNQueryQueryToolChestTest
Assert.assertEquals(result, fromCacheResult);
}
@Test
public void testMinTopNThreshold() throws Exception
{
TopNQueryConfig config = new TopNQueryConfig();
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(
config,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
chest,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
factory,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId)
);
Map<String, Object> context = Maps.newHashMap();
context.put("minTopNThreshold", 500);
TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.placementishDimension)
.metric(QueryRunnerTestHelper.indexMetric)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(QueryRunnerTestHelper.commonAggregators);
TopNQuery query1 = builder.threshold(10).context(null).build();
MockQueryRunner mockRunner = new MockQueryRunner(runner);
new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
.run(query1, ImmutableMap.<String, Object>of());
Assert.assertEquals(1000, mockRunner.query.getThreshold());
TopNQuery query2 = builder.threshold(10).context(context).build();
new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
.run(query2, ImmutableMap.<String, Object>of());
Assert.assertEquals(500, mockRunner.query.getThreshold());
TopNQuery query3 = builder.threshold(2000).context(context).build();
new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
.run(query3, ImmutableMap.<String, Object>of());
Assert.assertEquals(2000, mockRunner.query.getThreshold());
}
static class MockQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;
TopNQuery query = null;
MockQueryRunner(QueryRunner<Result<TopNResultValue>> runner)
{
this.runner = runner;
}
@Override
public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> query,
Map<String, Object> responseContext
)
{
this.query = (TopNQuery) query;
return query.run(runner, responseContext);
}
}
}