fix tests

This commit is contained in:
nishantmonu51 2014-04-08 02:37:57 +05:30
parent db35009acd
commit 4bb36dd453
2 changed files with 131 additions and 108 deletions

View File

@ -64,11 +64,13 @@ import java.util.Map;
public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultValue>, TopNQuery>
{
private static final byte TOPN_QUERY = 0x1;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE = new TypeReference<Result<TopNResultValue>>(){};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>(){};
private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE = new TypeReference<Result<TopNResultValue>>()
{
};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
private final TopNQueryConfig config;
@Inject
@ -163,7 +165,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
Object calculatedPostAgg = input.getMetric(postAgg.getName());
if (calculatedPostAgg != null) {
values.put(postAgg.getName(), input.getMetric(postAgg.getName()));
values.put(postAgg.getName(), calculatedPostAgg);
} else {
values.put(postAgg.getName(), postAgg.compute(values));
}
@ -314,6 +316,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new ThresholdAdjustingQueryRunner(runner, config.getMinTopNThreshold());
}
public Ordering<Result<TopNResultValue>> getOrdering()
{
return Ordering.natural();
}
private static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;
@ -398,9 +405,4 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
);
}
}
public Ordering<Result<TopNResultValue>> getOrdering()
{
return Ordering.natural();
}
}

View File

@ -47,6 +47,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
@ -115,17 +116,21 @@ import java.util.concurrent.Executor;
@RunWith(Parameterized.class)
public class CachingClusteredClientTest
{
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
public static final String DATA_SOURCE = "test";
protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
static {
jsonMapper.getFactory().setCodec(jsonMapper);
}
/**
* We want a deterministic test, but we'd also like a bit of randomness for the distribution of segments
* across servers. Thus, we loop multiple times and each time use a deterministically created Random instance.
* Increase this value to increase exposure to random situations at the expense of test run time.
*/
private static final int RANDOMNESS = 10;
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
public static final String DATA_SOURCE = "test";
private static final List<AggregatorFactory> AGGS = Arrays.asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("imps", "imps"),
@ -152,6 +157,17 @@ public class CachingClusteredClientTest
private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles");
private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
private static final String TOP_DIM = "a_dim";
private final Random random;
protected VersionedIntervalTimeline<String, ServerSelector> timeline;
protected TimelineServerView serverView;
protected Cache cache;
public CachingClusteredClient client;
DruidServer[] servers;
public CachingClusteredClientTest(int randomSeed)
{
this.random = new Random(randomSeed);
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
@ -169,28 +185,6 @@ public class CachingClusteredClientTest
);
}
protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
static {
jsonMapper.getFactory().setCodec(jsonMapper);
}
private final Random random;
protected VersionedIntervalTimeline<String, ServerSelector> timeline;
protected TimelineServerView serverView;
protected Cache cache;
CachingClusteredClient client;
DruidServer[] servers;
public CachingClusteredClientTest(int randomSeed)
{
this.random = new Random(randomSeed);
}
@Before
public void setUp() throws Exception
{
@ -214,15 +208,16 @@ public class CachingClusteredClientTest
public void testTimeseriesCaching() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
testQueryCaching(
client,
builder.build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000),
new Interval("2011-01-02/2011-01-03"), makeTimeResults(new DateTime("2011-01-02"), 30, 6000),
@ -265,9 +260,9 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-01-01/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -277,15 +272,16 @@ public class CachingClusteredClientTest
public void testTimeseriesCachingTimeZone() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(PT1H_TZ_GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(PT1H_TZ_GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
testQueryCaching(
client,
builder.build(),
new Interval("2011-11-04/2011-11-08"),
makeTimeResults(
@ -305,9 +301,9 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-11-04/2011-11-08")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -316,18 +312,23 @@ public class CachingClusteredClientTest
public void testDisableUseCache() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS);
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS);
testQueryCaching(
client,
1,
true,
builder.context(ImmutableMap.<String, Object>of("useCache", "false",
"populateCache", "true")).build(),
builder.context(
ImmutableMap.<String, Object>of(
"useCache", "false",
"populateCache", "true"
)
).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
@ -338,10 +339,15 @@ public class CachingClusteredClientTest
cache.close("0_0");
testQueryCaching(
client,
1,
false,
builder.context(ImmutableMap.<String, Object>of("useCache", "false",
"populateCache", "false")).build(),
builder.context(
ImmutableMap.<String, Object>of(
"useCache", "false",
"populateCache", "false"
)
).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
@ -350,10 +356,15 @@ public class CachingClusteredClientTest
Assert.assertEquals(0, cache.getStats().getNumMisses());
testQueryCaching(
client,
1,
false,
builder.context(ImmutableMap.<String, Object>of("useCache", "true",
"populateCache", "false")).build(),
builder.context(
ImmutableMap.<String, Object>of(
"useCache", "true",
"populateCache", "false"
)
).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
@ -378,7 +389,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeTopNResults(new DateTime("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998),
@ -420,12 +434,12 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
),
client.run(
runner.run(
builder.intervals("2011-01-01/2011-01-10")
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -446,7 +460,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-11-04/2011-11-08"),
makeTopNResults(
@ -465,12 +482,12 @@ public class CachingClusteredClientTest
new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-11-07", TIMEZONE), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986
),
client.run(
runner.run(
builder.intervals("2011-11-04/2011-11-08")
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -491,7 +508,9 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeTopNResults(),
@ -518,6 +537,7 @@ public class CachingClusteredClientTest
)
);
TestHelper.assertExpectedResults(
makeRenamedTopNResults(
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
@ -531,12 +551,12 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
),
client.run(
runner.run(
builder.intervals("2011-01-01/2011-01-10")
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -545,6 +565,7 @@ public class CachingClusteredClientTest
public void testSearchCaching() throws Exception
{
testQueryCaching(
client,
new SearchQuery(
new TableDataSource(DATA_SOURCE),
DIM_FILTER,
@ -582,13 +603,14 @@ public class CachingClusteredClientTest
);
}
public void testQueryCaching(final Query query, Object... args)
public void testQueryCaching(QueryRunner runner, final Query query, Object... args)
{
testQueryCaching(3, true, query, args);
testQueryCaching(runner, 3, true, query, args);
}
@SuppressWarnings("unchecked")
public void testQueryCaching(
final QueryRunner runner,
final int numTimesToQuery,
boolean expectBySegment,
final Query query, Object... args // does this assume query intervals must be ordered?
@ -638,8 +660,8 @@ public class CachingClusteredClientTest
EasyMock.expect(serverView.getQueryRunner(server))
.andReturn(expectations.getQueryRunner())
.once();
.andReturn(expectations.getQueryRunner())
.once();
final Capture<? extends Query> capture = new Capture();
queryCaptures.add(capture);
@ -656,8 +678,8 @@ public class CachingClusteredClientTest
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
.once();
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
.once();
} else if (query instanceof TopNQuery) {
List<String> segmentIds = Lists.newArrayList();
@ -669,8 +691,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTopNResults(segmentIds, intervals, results))
.once();
.andReturn(toQueryableTopNResults(segmentIds, intervals, results))
.once();
} else if (query instanceof SearchQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
@ -681,8 +703,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableSearchResults(segmentIds, intervals, results))
.once();
.andReturn(toQueryableSearchResults(segmentIds, intervals, results))
.once();
} else if (query instanceof TimeBoundaryQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
@ -693,8 +715,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
.once();
.andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
.once();
} else {
throw new ISE("Unknown query type[%s]", query.getClass());
}
@ -742,7 +764,7 @@ public class CachingClusteredClientTest
}
)
),
client.run(
runner.run(
query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
Arrays.asList(
@ -766,7 +788,7 @@ public class CachingClusteredClientTest
} else {
Assert.assertTrue(
capturedQuery.getContextValue("bySegment") == null ||
capturedQuery.getContextValue("bySegment").equals("false")
capturedQuery.getContextValue("bySegment").equals("false")
);
}
}
@ -1160,13 +1182,13 @@ public class CachingClusteredClientTest
return new CachingClusteredClient(
new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(new QueryConfig())
)
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.build()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(new QueryConfig())
)
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.build()
),
new TimelineServerView()
{
@ -1241,6 +1263,8 @@ public class CachingClusteredClientTest
private class MyDataSegment extends DataSegment
{
private final DataSegment baseSegment = segment;
private MyDataSegment()
{
super(
@ -1256,8 +1280,6 @@ public class CachingClusteredClientTest
);
}
private final DataSegment baseSegment = segment;
@Override
@JsonProperty
public String getDataSource()
@ -1358,7 +1380,6 @@ public class CachingClusteredClientTest
{
private final DruidServer server;
private final QueryRunner queryRunner;
private final List<ServerExpectation> expectations = Lists.newArrayList();
public ServerExpectations(