Code cleanup for CachingClusteredClientTest

This commit is contained in:
Charles Allen 2015-08-25 12:49:55 -07:00
parent d996e0aecc
commit 85aeda834c
1 changed files with 168 additions and 90 deletions

View File

@ -17,7 +17,6 @@
package io.druid.client;
import com.amazonaws.services.support.model.Service;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
@ -48,8 +47,6 @@ import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.TrinaryFn;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
@ -121,7 +118,6 @@ import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import io.druid.timeline.partition.StringPartitionChunk;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
@ -138,7 +134,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@ -159,9 +154,11 @@ public class CachingClusteredClientTest
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
public static final String DATA_SOURCE = "test";
protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
static {
jsonMapper.getFactory().setCodec(jsonMapper);
}
/**
* We want a deterministic test, but we'd also like a bit of randomness for the distribution of segments
* across servers. Thus, we loop multiple times and each time use a deterministically created Random instance.
@ -205,7 +202,7 @@ public class CachingClusteredClientTest
new LongSumAggregatorFactory("impers2", "imps")
);
private static final DimFilter DIM_FILTER = null;
private static final List<PostAggregator> RENAMED_POST_AGGS = Arrays.asList();
private static final List<PostAggregator> RENAMED_POST_AGGS = ImmutableList.of();
private static final QueryGranularity GRANULARITY = QueryGranularity.DAY;
private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles");
private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
@ -225,14 +222,14 @@ public class CachingClusteredClientTest
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return Lists.transform(
Lists.newArrayList(new RangeIterable(RANDOMNESS)),
new Function<Integer, Object>()
new Function<Integer, Object[]>()
{
@Override
public Object apply(@Nullable Integer input)
public Object[] apply(@Nullable Integer input)
{
return new Object[]{input};
}
@ -265,10 +262,12 @@ public class CachingClusteredClientTest
// The purpose of this special executor is to randomize execution of tasks on purpose.
// Since we don't know the number of tasks to be executed, a special DrainTask is used
// to trigger the actual execution when we are ready to shuffle the order.
abstract class DrainTask implements Runnable {}
abstract class DrainTask implements Runnable
{
}
final ForwardingListeningExecutorService randomizingExecutorService = new ForwardingListeningExecutorService()
{
final ConcurrentLinkedDeque<Pair<SettableFuture, Object>> taskQueue = new ConcurrentLinkedDeque();
final ConcurrentLinkedDeque<Pair<SettableFuture, Object>> taskQueue = new ConcurrentLinkedDeque<>();
final ListeningExecutorService delegate = MoreExecutors.listeningDecorator(
// we need to run everything in the same thread to ensure all callbacks on futures in CachingClusteredClient
// are complete before moving on to the next query run.
@ -371,8 +370,11 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching(
runner,
@ -407,8 +409,11 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching(
runner,
@ -476,8 +481,11 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching(
runner,
@ -536,8 +544,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching(
@ -580,8 +590,11 @@ public class CachingClusteredClientTest
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching(
runner,
1,
@ -652,8 +665,12 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching(
runner,
@ -725,8 +742,12 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching(
runner,
@ -763,7 +784,7 @@ public class CachingClusteredClientTest
public void testOutOfOrderSequenceMerging() throws Exception
{
List<Sequence<Result<TopNResultValue>>> sequences =
Lists.newArrayList(
ImmutableList.of(
Sequences.simple(
makeTopNResults(
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
@ -793,8 +814,10 @@ public class CachingClusteredClientTest
),
client.mergeCachedAndUncachedSequences(
sequences,
new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
);
}
@ -816,8 +839,12 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching(
runner,
builder.build(),
@ -886,8 +913,12 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching(
runner,
builder.build(),
@ -982,9 +1013,11 @@ public class CachingClusteredClientTest
)
);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new SearchQueryQueryToolChest(
QueryRunner runner = new FinalizeResultsQueryRunner(
client, new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(
@ -1085,8 +1118,10 @@ public class CachingClusteredClientTest
@Test
public void testGroupByCaching() throws Exception
{
List<AggregatorFactory> aggsWithUniques = ImmutableList.<AggregatorFactory>builder().addAll(AGGS)
.add(new HyperUniquesAggregatorFactory("uniques", "uniques")).build();
List<AggregatorFactory> aggsWithUniques = ImmutableList.<AggregatorFactory>builder()
.addAll(AGGS)
.add(new HyperUniquesAggregatorFactory("uniques", "uniques"))
.build();
final HashFunction hashFn = Hashing.murmur3_128();
@ -1108,27 +1143,43 @@ public class CachingClusteredClientTest
client,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "a", "rows", 1, "imps", 1, "impers", 1, "uniques", collector)),
makeGroupByResults(
new DateTime("2011-01-01"),
ImmutableMap.of("a", "a", "rows", 1, "imps", 1, "impers", 1, "uniques", collector)
),
new Interval("2011-01-02/2011-01-03"),
makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "b", "rows", 2, "imps", 2, "impers", 2, "uniques", collector)),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
new DateTime("2011-01-02"),
ImmutableMap.of("a", "b", "rows", 2, "imps", 2, "impers", 2, "uniques", collector)
),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
new DateTime("2011-01-05"),
ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06"),
ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07"),
ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08"),
ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09"),
ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05T01"),
ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06T01"),
ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07T01"),
ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08T01"),
ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09T01"),
ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
)
);
@ -1165,16 +1216,26 @@ public class CachingClusteredClientTest
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedObjects(
makeGroupByResults(
new DateTime("2011-01-05T"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06T"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07T"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08T"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09T"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
new DateTime("2011-01-05T"),
ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-05T01"),
ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06T"),
ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-06T01"),
ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07T"),
ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-07T01"),
ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08T"),
ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-08T01"),
ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09T"),
ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector),
new DateTime("2011-01-09T01"),
ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
),
runner.run(
builder.setInterval("2011-01-05/2011-01-10")
@ -1277,7 +1338,7 @@ public class CachingClusteredClientTest
);
}
return Arrays.asList(
return ImmutableList.of(
new Result<>(
timestamp,
new TimeBoundaryResultValue(value)
@ -1475,7 +1536,7 @@ public class CachingClusteredClientTest
runner.run(
query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
Arrays.asList(
ImmutableList.of(
actualQueryInterval
)
)
@ -1937,7 +1998,7 @@ public class CachingClusteredClientTest
while (index < objects.length) {
DateTime timestamp = (DateTime) objects[index++];
List values = Lists.newArrayList();
List<SearchHit> values = Lists.newArrayList();
while (index < objects.length && !(objects[index] instanceof DateTime)) {
values.add(new SearchHit(TOP_DIM, objects[index++].toString()));
}
@ -1954,7 +2015,7 @@ public class CachingClusteredClientTest
while (index < objects.length) {
DateTime timestamp = (DateTime) objects[index++];
List values = Lists.newArrayList();
List<EventHolder> values = Lists.newArrayList();
while (index < objects.length && !(objects[index] instanceof DateTime)) {
values.add(new EventHolder(null, 0, (Map) objects[index++]));
}
@ -1966,11 +2027,11 @@ public class CachingClusteredClientTest
private Iterable<Row> makeGroupByResults(Object... objects)
{
List retVal = Lists.newArrayList();
List<Row> retVal = Lists.newArrayList();
int index = 0;
while (index < objects.length) {
DateTime timestamp = (DateTime) objects[index++];
retVal.add(new MapBasedRow(timestamp, (Map) objects[index++]));
retVal.add(new MapBasedRow(timestamp, (Map<String, Object>) objects[index++]));
}
return retVal;
}
@ -2002,16 +2063,27 @@ public class CachingClusteredClientTest
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
TopNQuery.class, new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
SearchQuery.class, new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()))
.put(
SelectQuery.class,
new SelectQueryQueryToolChest(jsonMapper,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
new SelectQueryQueryToolChest(
jsonMapper,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
GroupByQuery.class,
@ -2370,10 +2442,16 @@ public class CachingClusteredClientTest
client,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("output", "a", "rows", 1, "imps", 1, "impers", 1)),
makeGroupByResults(
new DateTime("2011-01-01"),
ImmutableMap.of("output", "a", "rows", 1, "imps", 1, "impers", 1)
),
new Interval("2011-01-02/2011-01-03"),
makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("output", "b", "rows", 2, "imps", 2, "impers", 2)),
makeGroupByResults(
new DateTime("2011-01-02"),
ImmutableMap.of("output", "b", "rows", 2, "imps", 2, "impers", 2)
),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(