diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 0c78f6305fe..8eb77874bf4 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -64,11 +64,13 @@ import java.util.Map; public class TopNQueryQueryToolChest extends QueryToolChest, TopNQuery> { private static final byte TOPN_QUERY = 0x1; - private static final Joiner COMMA_JOIN = Joiner.on(","); - private static final TypeReference> TYPE_REFERENCE = new TypeReference>(){}; - - private static final TypeReference OBJECT_TYPE_REFERENCE = new TypeReference(){}; + private static final TypeReference> TYPE_REFERENCE = new TypeReference>() + { + }; + private static final TypeReference OBJECT_TYPE_REFERENCE = new TypeReference() + { + }; private final TopNQueryConfig config; @Inject @@ -163,7 +165,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest> getOrdering() + { + return Ordering.natural(); + } + private static class ThresholdAdjustingQueryRunner implements QueryRunner> { private final QueryRunner> runner; @@ -398,9 +405,4 @@ public class TopNQueryQueryToolChest extends QueryToolChest> getOrdering() - { - return Ordering.natural(); - } } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index cfea29f9a8e..07bec13f16b 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -47,6 +47,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.BySegmentResultValueClass; import io.druid.query.DataSource; import io.druid.query.Druids; +import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; import io.druid.query.QueryConfig; @@ -115,17 +116,21 @@ import java.util.concurrent.Executor; @RunWith(Parameterized.class) public class CachingClusteredClientTest { + public static final ImmutableMap CONTEXT = ImmutableMap.of(); + public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); + public static final String DATA_SOURCE = "test"; + protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); + + static { + jsonMapper.getFactory().setCodec(jsonMapper); + } + /** * We want a deterministic test, but we'd also like a bit of randomness for the distribution of segments * across servers. Thus, we loop multiple times and each time use a deterministically created Random instance. * Increase this value to increase exposure to random situations at the expense of test run time. */ private static final int RANDOMNESS = 10; - - public static final ImmutableMap CONTEXT = ImmutableMap.of(); - public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); - public static final String DATA_SOURCE = "test"; - private static final List AGGS = Arrays.asList( new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("imps", "imps"), @@ -152,6 +157,17 @@ public class CachingClusteredClientTest private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles"); private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE); private static final String TOP_DIM = "a_dim"; + private final Random random; + protected VersionedIntervalTimeline timeline; + protected TimelineServerView serverView; + protected Cache cache; + public CachingClusteredClient client; + DruidServer[] servers; + + public CachingClusteredClientTest(int randomSeed) + { + this.random = new Random(randomSeed); + } @Parameterized.Parameters public static Collection constructorFeeder() throws IOException @@ -169,28 +185,6 @@ public class CachingClusteredClientTest ); } - - protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); - - static { - jsonMapper.getFactory().setCodec(jsonMapper); - } - - private final Random random; - - protected VersionedIntervalTimeline timeline; - protected TimelineServerView serverView; - protected Cache cache; - - CachingClusteredClient client; - - DruidServer[] servers; - - public CachingClusteredClientTest(int randomSeed) - { - this.random = new Random(randomSeed); - } - @Before public void setUp() throws Exception { @@ -214,15 +208,16 @@ public class CachingClusteredClientTest public void testTimeseriesCaching() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS) - .context(CONTEXT); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); testQueryCaching( + client, builder.build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000), new Interval("2011-01-02/2011-01-03"), makeTimeResults(new DateTime("2011-01-02"), 30, 6000), @@ -265,9 +260,9 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-01-01/2011-01-10") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -277,15 +272,16 @@ public class CachingClusteredClientTest public void testTimeseriesCachingTimeZone() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(PT1H_TZ_GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS) - .context(CONTEXT); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(PT1H_TZ_GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); testQueryCaching( + client, builder.build(), new Interval("2011-11-04/2011-11-08"), makeTimeResults( @@ -305,9 +301,9 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-11-04/2011-11-08") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -316,18 +312,23 @@ public class CachingClusteredClientTest public void testDisableUseCache() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS); testQueryCaching( + client, 1, true, - builder.context(ImmutableMap.of("useCache", "false", - "populateCache", "true")).build(), + builder.context( + ImmutableMap.of( + "useCache", "false", + "populateCache", "true" + ) + ).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -338,10 +339,15 @@ public class CachingClusteredClientTest cache.close("0_0"); testQueryCaching( + client, 1, false, - builder.context(ImmutableMap.of("useCache", "false", - "populateCache", "false")).build(), + builder.context( + ImmutableMap.of( + "useCache", "false", + "populateCache", "false" + ) + ).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -350,10 +356,15 @@ public class CachingClusteredClientTest Assert.assertEquals(0, cache.getStats().getNumMisses()); testQueryCaching( + client, 1, false, - builder.context(ImmutableMap.of("useCache", "true", - "populateCache", "false")).build(), + builder.context( + ImmutableMap.of( + "useCache", "true", + "populateCache", "false" + ) + ).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -378,7 +389,10 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); + testQueryCaching( + runner, builder.build(), new Interval("2011-01-01/2011-01-02"), makeTopNResults(new DateTime("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998), @@ -420,12 +434,12 @@ public class CachingClusteredClientTest new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 ), - client.run( + runner.run( builder.intervals("2011-01-01/2011-01-10") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -446,7 +460,10 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); + testQueryCaching( + runner, builder.build(), new Interval("2011-11-04/2011-11-08"), makeTopNResults( @@ -465,12 +482,12 @@ public class CachingClusteredClientTest new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-11-07", TIMEZONE), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986 ), - client.run( + runner.run( builder.intervals("2011-11-04/2011-11-08") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -491,7 +508,9 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); testQueryCaching( + runner, builder.build(), new Interval("2011-01-01/2011-01-02"), makeTopNResults(), @@ -518,6 +537,7 @@ public class CachingClusteredClientTest ) ); + TestHelper.assertExpectedResults( makeRenamedTopNResults( new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, @@ -531,12 +551,12 @@ public class CachingClusteredClientTest new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 ), - client.run( + runner.run( builder.intervals("2011-01-01/2011-01-10") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -545,6 +565,7 @@ public class CachingClusteredClientTest public void testSearchCaching() throws Exception { testQueryCaching( + client, new SearchQuery( new TableDataSource(DATA_SOURCE), DIM_FILTER, @@ -582,13 +603,14 @@ public class CachingClusteredClientTest ); } - public void testQueryCaching(final Query query, Object... args) + public void testQueryCaching(QueryRunner runner, final Query query, Object... args) { - testQueryCaching(3, true, query, args); + testQueryCaching(runner, 3, true, query, args); } @SuppressWarnings("unchecked") public void testQueryCaching( + final QueryRunner runner, final int numTimesToQuery, boolean expectBySegment, final Query query, Object... args // does this assume query intervals must be ordered? @@ -638,8 +660,8 @@ public class CachingClusteredClientTest EasyMock.expect(serverView.getQueryRunner(server)) - .andReturn(expectations.getQueryRunner()) - .once(); + .andReturn(expectations.getQueryRunner()) + .once(); final Capture capture = new Capture(); queryCaptures.add(capture); @@ -656,8 +678,8 @@ public class CachingClusteredClientTest } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) + .once(); } else if (query instanceof TopNQuery) { List segmentIds = Lists.newArrayList(); @@ -669,8 +691,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTopNResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTopNResults(segmentIds, intervals, results)) + .once(); } else if (query instanceof SearchQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -681,8 +703,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) + .once(); } else if (query instanceof TimeBoundaryQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -693,8 +715,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) + .once(); } else { throw new ISE("Unknown query type[%s]", query.getClass()); } @@ -742,7 +764,7 @@ public class CachingClusteredClientTest } ) ), - client.run( + runner.run( query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec( Arrays.asList( @@ -766,7 +788,7 @@ public class CachingClusteredClientTest } else { Assert.assertTrue( capturedQuery.getContextValue("bySegment") == null || - capturedQuery.getContextValue("bySegment").equals("false") + capturedQuery.getContextValue("bySegment").equals("false") ); } } @@ -1160,13 +1182,13 @@ public class CachingClusteredClientTest return new CachingClusteredClient( new MapQueryToolChestWarehouse( ImmutableMap., QueryToolChest>builder() - .put( - TimeseriesQuery.class, - new TimeseriesQueryQueryToolChest(new QueryConfig()) - ) - .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) - .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) - .build() + .put( + TimeseriesQuery.class, + new TimeseriesQueryQueryToolChest(new QueryConfig()) + ) + .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) + .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) + .build() ), new TimelineServerView() { @@ -1241,6 +1263,8 @@ public class CachingClusteredClientTest private class MyDataSegment extends DataSegment { + private final DataSegment baseSegment = segment; + private MyDataSegment() { super( @@ -1256,8 +1280,6 @@ public class CachingClusteredClientTest ); } - private final DataSegment baseSegment = segment; - @Override @JsonProperty public String getDataSource() @@ -1358,7 +1380,6 @@ public class CachingClusteredClientTest { private final DruidServer server; private final QueryRunner queryRunner; - private final List expectations = Lists.newArrayList(); public ServerExpectations(