adding support to execute Select queries in AggregationTestHelper so that Select query based UTs can be written for complex aggregator implementations

This commit is contained in:
Himanshu Gupta 2016-01-05 21:36:52 -06:00
parent 6d886da7d9
commit 3f048f0b15
6 changed files with 122 additions and 23 deletions

View File

@ -56,7 +56,7 @@ public class SketchAggregationTest
{ {
SketchModule sm = new SketchModule(); SketchModule sm = new SketchModule();
sm.configure(null); sm.configure(null);
helper = new AggregationTestHelper(sm.getJacksonModules(), tempFolder); helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(sm.getJacksonModules(), tempFolder);
} }
@Test @Test

View File

@ -55,7 +55,7 @@ public class OldApiSketchAggregationTest
OldApiSketchModule sm = new OldApiSketchModule(); OldApiSketchModule sm = new OldApiSketchModule();
sm.configure(null); sm.configure(null);
helper = new AggregationTestHelper( helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
sm.getJacksonModules(), sm.getJacksonModules(),
tempFolder tempFolder
); );

View File

@ -43,7 +43,10 @@ public class ApproximateHistogramAggregationTest
{ {
ApproximateHistogramDruidModule module = new ApproximateHistogramDruidModule(); ApproximateHistogramDruidModule module = new ApproximateHistogramDruidModule();
module.configure(null); module.configure(null);
helper = new AggregationTestHelper(Lists.newArrayList(module.getJacksonModules()), tempFolder); helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
Lists.newArrayList(module.getJacksonModules()),
tempFolder
);
} }
@Test @Test

View File

@ -72,7 +72,7 @@ public class MultiValuedDimensionTest
public MultiValuedDimensionTest() throws Exception public MultiValuedDimensionTest() throws Exception
{ {
helper = new AggregationTestHelper( helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
ImmutableList.<Module>of(), null ImmutableList.<Module>of(), null
); );
} }

View File

@ -19,9 +19,14 @@
package io.druid.query.aggregation; package io.druid.query.aggregation;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
@ -30,6 +35,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
@ -46,13 +52,17 @@ import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher; import io.druid.query.QueryWatcher;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.GroupByQueryRunnerFactory; import io.druid.query.groupby.GroupByQueryRunnerFactory;
import io.druid.query.select.SelectQueryEngine;
import io.druid.query.select.SelectQueryQueryToolChest;
import io.druid.query.select.SelectQueryRunnerFactory;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
@ -88,17 +98,39 @@ public class AggregationTestHelper
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final IndexMerger indexMerger; private final IndexMerger indexMerger;
private final IndexIO indexIO; private final IndexIO indexIO;
private final GroupByQueryQueryToolChest toolChest; private final QueryToolChest toolChest;
private final GroupByQueryRunnerFactory factory; private final QueryRunnerFactory factory;
private final TemporaryFolder tempFolder; private final TemporaryFolder tempFolder;
public AggregationTestHelper(List<? extends Module> jsonModulesToRegister, TemporaryFolder tempFoler) private AggregationTestHelper(
ObjectMapper mapper,
IndexMerger indexMerger,
IndexIO indexIO,
QueryToolChest toolchest,
QueryRunnerFactory factory,
TemporaryFolder tempFolder,
List<? extends Module> jsonModulesToRegister
)
{ {
this.tempFolder = tempFoler; this.mapper = mapper;
mapper = new DefaultObjectMapper(); this.indexMerger = indexMerger;
indexIO = TestHelper.getTestIndexIO(); this.indexIO = indexIO;
indexMerger = TestHelper.getTestIndexMerger(); this.toolChest = toolchest;
this.factory = factory;
this.tempFolder = tempFolder;
for(Module mod : jsonModulesToRegister) {
mapper.registerModule(mod);
}
}
public static final AggregationTestHelper createGroupByQueryAggregationTestHelper(
List<? extends Module> jsonModulesToRegister,
TemporaryFolder tempFolder
)
{
ObjectMapper mapper = new DefaultObjectMapper();
for(Module mod : jsonModulesToRegister) { for(Module mod : jsonModulesToRegister) {
mapper.registerModule(mod); mapper.registerModule(mod);
@ -125,17 +157,59 @@ public class AggregationTestHelper
}; };
GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool); GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool);
toolChest = new GroupByQueryQueryToolChest( GroupByQueryQueryToolChest toolchest = new GroupByQueryQueryToolChest(
configSupplier, mapper, engine, pool, configSupplier, mapper, engine, pool,
NoopIntervalChunkingQueryRunnerDecorator() NoopIntervalChunkingQueryRunnerDecorator()
); );
factory = new GroupByQueryRunnerFactory( GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine, engine,
noopQueryWatcher, noopQueryWatcher,
configSupplier, configSupplier,
toolChest, toolchest,
pool pool
); );
return new AggregationTestHelper(
mapper,
TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexIO(),
toolchest,
factory,
tempFolder,
jsonModulesToRegister
);
}
public static final AggregationTestHelper createSelectQueryAggregationTestHelper(
List<? extends Module> jsonModulesToRegister,
TemporaryFolder tempFolder
)
{
ObjectMapper mapper = new DefaultObjectMapper();
SelectQueryQueryToolChest toolchest = new SelectQueryQueryToolChest(
new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
SelectQueryRunnerFactory factory = new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(
new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
new SelectQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
return new AggregationTestHelper(
mapper,
TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexIO(),
toolchest,
factory,
tempFolder,
jsonModulesToRegister
);
} }
public Sequence<Row> createIndexAndRunQueryOnSegment( public Sequence<Row> createIndexAndRunQueryOnSegment(
@ -289,12 +363,12 @@ public class AggregationTestHelper
//Simulates running group-by query on individual segments as historicals would do, json serialize the results //Simulates running group-by query on individual segments as historicals would do, json serialize the results
//from each segment, later deserialize and merge and finally return the results //from each segment, later deserialize and merge and finally return the results
public Sequence<Row> runQueryOnSegments(final List<File> segmentDirs, final String groupByQueryJson) throws Exception public Sequence<Row> runQueryOnSegments(final List<File> segmentDirs, final String queryJson) throws Exception
{ {
return runQueryOnSegments(segmentDirs, mapper.readValue(groupByQueryJson, GroupByQuery.class)); return runQueryOnSegments(segmentDirs, mapper.readValue(queryJson, Query.class));
} }
public Sequence<Row> runQueryOnSegments(final List<File> segmentDirs, final GroupByQuery query) public Sequence<Row> runQueryOnSegments(final List<File> segmentDirs, final Query query)
{ {
final List<Segment> segments = Lists.transform( final List<Segment> segments = Lists.transform(
segmentDirs, segmentDirs,
@ -322,7 +396,7 @@ public class AggregationTestHelper
} }
} }
public Sequence<Row> runQueryOnSegmentsObjs(final List<Segment> segments, final GroupByQuery query) public Sequence<Row> runQueryOnSegmentsObjs(final List<Segment> segments, final Query query)
{ {
final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner(
toolChest.postMergeQueryDecoration( toolChest.postMergeQueryDecoration(
@ -385,10 +459,11 @@ public class AggregationTestHelper
); );
String resultStr = mapper.writer().writeValueAsString(yielder); String resultStr = mapper.writer().writeValueAsString(yielder);
TypeFactory typeFactory = mapper.getTypeFactory();
JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference());
List resultRows = Lists.transform( List resultRows = Lists.transform(
(List<Row>)mapper.readValue( readQueryResultArrayFromString(resultStr),
resultStr, new TypeReference<List<Row>>() {}
),
toolChest.makePreComputeManipulatorFn( toolChest.makePreComputeManipulatorFn(
query, query,
MetricManipulatorFns.deserializing() MetricManipulatorFns.deserializing()
@ -402,6 +477,24 @@ public class AggregationTestHelper
}; };
} }
private List readQueryResultArrayFromString(String str) throws Exception
{
List result = new ArrayList();
JsonParser jp = mapper.getFactory().createParser(str);
if (jp.nextToken() != JsonToken.START_ARRAY) {
throw new IAE("not an array [%s]", str);
}
ObjectCodec objectCodec = jp.getCodec();
while(jp.nextToken() != JsonToken.END_ARRAY) {
result.add(objectCodec.readValue(jp, toolChest.getResultTypeReference()));
}
return result;
}
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
{ {
return new IntervalChunkingQueryRunnerDecorator(null, null, null) { return new IntervalChunkingQueryRunnerDecorator(null, null, null) {

View File

@ -41,7 +41,10 @@ public class HyperUniquesAggregationTest
@Test @Test
public void testIngestAndQuery() throws Exception public void testIngestAndQuery() throws Exception
{ {
AggregationTestHelper helper = new AggregationTestHelper(Lists.newArrayList(new AggregatorsModule()), tempFolder); AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
Lists.newArrayList(new AggregatorsModule()),
tempFolder
);
String metricSpec = "[{" String metricSpec = "[{"
+ "\"type\": \"hyperUnique\"," + "\"type\": \"hyperUnique\","