add disable populate / use cache tests

This commit is contained in:
Xavier Léauté 2014-03-05 13:58:38 -08:00
parent fffb58f6e8
commit 9fc5cca663
1 changed files with 168 additions and 47 deletions

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@ -308,6 +309,56 @@ public class CachingClusteredClientTest
);
}
@Test
public void testDisableUseCache() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS);
testQueryCaching(
1,
true,
builder.context(ImmutableMap.of("useCache", "false",
"populateCache", "true")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
Assert.assertEquals(1, cache.getStats().getNumEntries());
Assert.assertEquals(0, cache.getStats().getNumHits());
Assert.assertEquals(0, cache.getStats().getNumMisses());
cache.close("0_0");
testQueryCaching(
1,
false,
builder.context(ImmutableMap.of("useCache", "false",
"populateCache", "false")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
Assert.assertEquals(0, cache.getStats().getNumEntries());
Assert.assertEquals(0, cache.getStats().getNumHits());
Assert.assertEquals(0, cache.getStats().getNumMisses());
testQueryCaching(
1,
false,
builder.context(ImmutableMap.of("useCache", "true",
"populateCache", "false")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
Assert.assertEquals(0, cache.getStats().getNumEntries());
Assert.assertEquals(0, cache.getStats().getNumHits());
Assert.assertEquals(1, cache.getStats().getNumMisses());
}
@Test
@SuppressWarnings("unchecked")
public void testTopNCaching() throws Exception
@ -528,9 +579,15 @@ public class CachingClusteredClientTest
);
}
public void testQueryCaching(final Query query, Object... args) {
testQueryCaching(3, true, query, args);
}
@SuppressWarnings("unchecked")
public void testQueryCaching(
final Query query, Object... args
final int numTimesToQuery,
boolean expectBySegment,
final Query query, Object... args // does this assume query intervals must be ordered?
)
{
if (args.length % 2 != 0) {
@ -553,9 +610,6 @@ public class CachingClusteredClientTest
}
for (int i = 0; i < queryIntervals.size(); ++i) {
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
final int numTimesToQuery = 3;
List<Object> mocks = Lists.newArrayList();
mocks.add(serverView);
@ -563,46 +617,12 @@ public class CachingClusteredClientTest
queryIntervals.get(0).getStart(), queryIntervals.get(i).getEnd()
);
final List<Map<DruidServer, ServerExpectations>> serverExpectationList = Lists.newArrayList();
for (int k = 0; k < i + 1; ++k) {
final int numChunks = expectedResults.get(k).size();
final TreeMap<DruidServer, ServerExpectations> serverExpectations = Maps.newTreeMap();
serverExpectationList.add(serverExpectations);
for (int j = 0; j < numChunks; ++j) {
DruidServer lastServer = servers[random.nextInt(servers.length)];
if (!serverExpectations.containsKey(lastServer)) {
serverExpectations.put(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
}
ServerExpectation expectation = new ServerExpectation(
String.format("%s_%s", k, j),
queryIntervals.get(i),
makeMock(mocks, DataSegment.class),
expectedResults.get(k).get(j)
final List<Map<DruidServer, ServerExpectations>> serverExpectationList = populateTimeline(
queryIntervals,
expectedResults,
i,
mocks
);
serverExpectations.get(lastServer).addExpectation(expectation);
ServerSelector selector = new ServerSelector(expectation.getSegment(), new RandomServerSelectorStrategy());
selector.addServer(new QueryableDruidServer(lastServer, null));
final PartitionChunk<ServerSelector> chunk;
if (numChunks == 1) {
chunk = new SingleElementPartitionChunk<>(selector);
} else {
String start = null;
String end = null;
if (j > 0) {
start = String.valueOf(j - 1);
}
if (j + 1 < numChunks) {
end = String.valueOf(j);
}
chunk = new StringPartitionChunk<>(start, end, j, selector);
}
timeline.add(queryIntervals.get(k), String.valueOf(k), chunk);
}
}
List<Capture> queryCaptures = Lists.newArrayList();
final Map<DruidServer, ServerExpectations> finalExpectation = serverExpectationList.get(
@ -612,7 +632,10 @@ public class CachingClusteredClientTest
DruidServer server = entry.getKey();
ServerExpectations expectations = entry.getValue();
EasyMock.expect(serverView.getQueryRunner(server)).andReturn(expectations.getQueryRunner()).once();
EasyMock.expect(serverView.getQueryRunner(server))
.andReturn(expectations.getQueryRunner())
.once();
final Capture<? extends Query> capture = new Capture();
queryCaptures.add(capture);
@ -627,9 +650,11 @@ public class CachingClusteredClientTest
intervals.add(expectation.getInterval());
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeseriesResults(segmentIds, intervals, results))
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
.once();
} else if (query instanceof TopNQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
@ -729,19 +754,81 @@ public class CachingClusteredClientTest
mocks.toArray()
);
// make sure all the queries were sent down as 'bySegment'
for (Capture queryCapture : queryCaptures) {
Query capturedQuery = (Query) queryCapture.getValue();
if(expectBySegment) {
Assert.assertEquals("true", capturedQuery.getContextValue("bySegment"));
}
else {
Assert.assertTrue(
capturedQuery.getContextValue("bySegment") == null ||
capturedQuery.getContextValue("bySegment").equals("false")
);
}
}
}
}
private List<Map<DruidServer, ServerExpectations>> populateTimeline(
List<Interval> queryIntervals,
List<List<Iterable<Result<Object>>>> expectedResults,
int numQueryIntervals,
List<Object> mocks
)
{
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
final List<Map<DruidServer, ServerExpectations>> serverExpectationList = Lists.newArrayList();
for (int k = 0; k < numQueryIntervals + 1; ++k) {
final int numChunks = expectedResults.get(k).size();
final TreeMap<DruidServer, ServerExpectations> serverExpectations = Maps.newTreeMap();
serverExpectationList.add(serverExpectations);
for (int j = 0; j < numChunks; ++j) {
DruidServer lastServer = servers[random.nextInt(servers.length)];
if (!serverExpectations.containsKey(lastServer)) {
serverExpectations.put(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
}
ServerExpectation expectation = new ServerExpectation(
String.format("%s_%s", k, j), // interval/chunk
queryIntervals.get(numQueryIntervals),
makeMock(mocks, DataSegment.class),
expectedResults.get(k).get(j)
);
serverExpectations.get(lastServer).addExpectation(expectation);
ServerSelector selector = new ServerSelector(expectation.getSegment(), new RandomServerSelectorStrategy());
selector.addServer(new QueryableDruidServer(lastServer, null));
final PartitionChunk<ServerSelector> chunk;
if (numChunks == 1) {
chunk = new SingleElementPartitionChunk<>(selector);
} else {
String start = null;
String end = null;
if (j > 0) {
start = String.valueOf(j - 1);
}
if (j + 1 < numChunks) {
end = String.valueOf(j);
}
chunk = new StringPartitionChunk<>(start, end, j, selector);
}
timeline.add(queryIntervals.get(k), String.valueOf(k), chunk);
}
} return serverExpectationList;
}
private Sequence<Result<TimeseriesResultValue>> toQueryableTimeseriesResults(
boolean bySegment,
Iterable<String> segmentIds,
Iterable<Interval> intervals,
Iterable<Iterable<Result<TimeseriesResultValue>>> results
)
{
if(bySegment) {
return Sequences.simple(
FunctionalIterable
.create(segmentIds)
@ -770,6 +857,9 @@ public class CachingClusteredClientTest
}
)
);
} else {
return Sequences.simple(Iterables.concat(results));
}
}
private Sequence<Result<TopNResultValue>> toQueryableTopNResults(
@ -903,6 +993,37 @@ public class CachingClusteredClientTest
return retVal;
}
private Iterable<BySegmentResultValueClass<TimeseriesResultValue>> makeBySegmentTimeResults
(Object... objects)
{
if (objects.length % 5 != 0) {
throw new ISE("makeTimeResults must be passed arguments in groups of 5, got[%d]", objects.length);
}
List<BySegmentResultValueClass<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 5);
for (int i = 0; i < objects.length; i += 5) {
retVal.add(
new BySegmentResultValueClass<TimeseriesResultValue>(
Lists.newArrayList(
new TimeseriesResultValue(
ImmutableMap.of(
"rows", objects[i + 1],
"imps", objects[i + 2],
"impers", objects[i + 2],
"avg_imps_per_row",
((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue()
)
)
),
(String)objects[i+3],
(Interval)objects[i+4]
)
);
}
return retVal;
}
private Iterable<Result<TimeseriesResultValue>> makeRenamedTimeResults
(Object... objects)
{