diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 9a5154e5824..78cfa370df1 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -308,6 +309,56 @@ public class CachingClusteredClientTest ); } + @Test + public void testDisableUseCache() throws Exception + { + final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS); + + testQueryCaching( + 1, + true, + builder.context(ImmutableMap.of("useCache", "false", + "populateCache", "true")).build(), + new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) + ); + + Assert.assertEquals(1, cache.getStats().getNumEntries()); + Assert.assertEquals(0, cache.getStats().getNumHits()); + Assert.assertEquals(0, cache.getStats().getNumMisses()); + + cache.close("0_0"); + + testQueryCaching( + 1, + false, + builder.context(ImmutableMap.of("useCache", "false", + "populateCache", "false")).build(), + new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) + ); + + Assert.assertEquals(0, cache.getStats().getNumEntries()); + Assert.assertEquals(0, cache.getStats().getNumHits()); + Assert.assertEquals(0, cache.getStats().getNumMisses()); + + testQueryCaching( + 1, + false, + builder.context(ImmutableMap.of("useCache", "true", + "populateCache", "false")).build(), + new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) + ); + + Assert.assertEquals(0, cache.getStats().getNumEntries()); + Assert.assertEquals(0, cache.getStats().getNumHits()); + Assert.assertEquals(1, cache.getStats().getNumMisses()); + } + @Test @SuppressWarnings("unchecked") public void testTopNCaching() throws Exception @@ -528,9 +579,15 @@ public class CachingClusteredClientTest ); } + public void testQueryCaching(final Query query, Object... args) { + testQueryCaching(3, true, query, args); + } + @SuppressWarnings("unchecked") public void testQueryCaching( - final Query query, Object... args + final int numTimesToQuery, + boolean expectBySegment, + final Query query, Object... args // does this assume query intervals must be ordered? ) { if (args.length % 2 != 0) { @@ -553,9 +610,6 @@ public class CachingClusteredClientTest } for (int i = 0; i < queryIntervals.size(); ++i) { - timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - final int numTimesToQuery = 3; - List mocks = Lists.newArrayList(); mocks.add(serverView); @@ -563,46 +617,12 @@ public class CachingClusteredClientTest queryIntervals.get(0).getStart(), queryIntervals.get(i).getEnd() ); - final List> serverExpectationList = Lists.newArrayList(); - - for (int k = 0; k < i + 1; ++k) { - final int numChunks = expectedResults.get(k).size(); - final TreeMap serverExpectations = Maps.newTreeMap(); - serverExpectationList.add(serverExpectations); - for (int j = 0; j < numChunks; ++j) { - DruidServer lastServer = servers[random.nextInt(servers.length)]; - if (!serverExpectations.containsKey(lastServer)) { - serverExpectations.put(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class))); - } - - ServerExpectation expectation = new ServerExpectation( - String.format("%s_%s", k, j), - queryIntervals.get(i), - makeMock(mocks, DataSegment.class), - expectedResults.get(k).get(j) - ); - serverExpectations.get(lastServer).addExpectation(expectation); - - ServerSelector selector = new ServerSelector(expectation.getSegment(), new RandomServerSelectorStrategy()); - selector.addServer(new QueryableDruidServer(lastServer, null)); - - final PartitionChunk chunk; - if (numChunks == 1) { - chunk = new SingleElementPartitionChunk<>(selector); - } else { - String start = null; - String end = null; - if (j > 0) { - start = String.valueOf(j - 1); - } - if (j + 1 < numChunks) { - end = String.valueOf(j); - } - chunk = new StringPartitionChunk<>(start, end, j, selector); - } - timeline.add(queryIntervals.get(k), String.valueOf(k), chunk); - } - } + final List> serverExpectationList = populateTimeline( + queryIntervals, + expectedResults, + i, + mocks + ); List queryCaptures = Lists.newArrayList(); final Map finalExpectation = serverExpectationList.get( @@ -612,7 +632,10 @@ public class CachingClusteredClientTest DruidServer server = entry.getKey(); ServerExpectations expectations = entry.getValue(); - EasyMock.expect(serverView.getQueryRunner(server)).andReturn(expectations.getQueryRunner()).once(); + + EasyMock.expect(serverView.getQueryRunner(server)) + .andReturn(expectations.getQueryRunner()) + .once(); final Capture capture = new Capture(); queryCaptures.add(capture); @@ -627,9 +650,11 @@ public class CachingClusteredClientTest intervals.add(expectation.getInterval()); results.add(expectation.getResults()); } + EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTimeseriesResults(segmentIds, intervals, results)) + .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) .once(); + } else if (query instanceof TopNQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -729,19 +754,81 @@ public class CachingClusteredClientTest mocks.toArray() ); + // make sure all the queries were sent down as 'bySegment' for (Capture queryCapture : queryCaptures) { Query capturedQuery = (Query) queryCapture.getValue(); - Assert.assertEquals("true", capturedQuery.getContextValue("bySegment")); + if(expectBySegment) { + Assert.assertEquals("true", capturedQuery.getContextValue("bySegment")); + } + else { + Assert.assertTrue( + capturedQuery.getContextValue("bySegment") == null || + capturedQuery.getContextValue("bySegment").equals("false") + ); + } } } } + private List> populateTimeline( + List queryIntervals, + List>>> expectedResults, + int numQueryIntervals, + List mocks + ) + { + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + + final List> serverExpectationList = Lists.newArrayList(); + + for (int k = 0; k < numQueryIntervals + 1; ++k) { + final int numChunks = expectedResults.get(k).size(); + final TreeMap serverExpectations = Maps.newTreeMap(); + serverExpectationList.add(serverExpectations); + for (int j = 0; j < numChunks; ++j) { + DruidServer lastServer = servers[random.nextInt(servers.length)]; + if (!serverExpectations.containsKey(lastServer)) { + serverExpectations.put(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class))); + } + + ServerExpectation expectation = new ServerExpectation( + String.format("%s_%s", k, j), // interval/chunk + queryIntervals.get(numQueryIntervals), + makeMock(mocks, DataSegment.class), + expectedResults.get(k).get(j) + ); + serverExpectations.get(lastServer).addExpectation(expectation); + + ServerSelector selector = new ServerSelector(expectation.getSegment(), new RandomServerSelectorStrategy()); + selector.addServer(new QueryableDruidServer(lastServer, null)); + + final PartitionChunk chunk; + if (numChunks == 1) { + chunk = new SingleElementPartitionChunk<>(selector); + } else { + String start = null; + String end = null; + if (j > 0) { + start = String.valueOf(j - 1); + } + if (j + 1 < numChunks) { + end = String.valueOf(j); + } + chunk = new StringPartitionChunk<>(start, end, j, selector); + } + timeline.add(queryIntervals.get(k), String.valueOf(k), chunk); + } + } return serverExpectationList; + } + private Sequence> toQueryableTimeseriesResults( + boolean bySegment, Iterable segmentIds, Iterable intervals, Iterable>> results ) { + if(bySegment) { return Sequences.simple( FunctionalIterable .create(segmentIds) @@ -770,6 +857,9 @@ public class CachingClusteredClientTest } ) ); + } else { + return Sequences.simple(Iterables.concat(results)); + } } private Sequence> toQueryableTopNResults( @@ -903,6 +993,37 @@ public class CachingClusteredClientTest return retVal; } + private Iterable> makeBySegmentTimeResults + (Object... objects) + { + if (objects.length % 5 != 0) { + throw new ISE("makeTimeResults must be passed arguments in groups of 5, got[%d]", objects.length); + } + + List> retVal = Lists.newArrayListWithCapacity(objects.length / 5); + for (int i = 0; i < objects.length; i += 5) { + retVal.add( + new BySegmentResultValueClass( + Lists.newArrayList( + new TimeseriesResultValue( + ImmutableMap.of( + "rows", objects[i + 1], + "imps", objects[i + 2], + "impers", objects[i + 2], + "avg_imps_per_row", + ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue() + ) + ) + ), + (String)objects[i+3], + (Interval)objects[i+4] + + ) + ); + } + return retVal; + } + private Iterable> makeRenamedTimeResults (Object... objects) {