From 3f048f0b155e5778fc20e707197628d8b21815c3 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 5 Jan 2016 21:36:52 -0600 Subject: [PATCH 1/3] adding support to execute Select queries in AggregationTestHelper so that Select query based UTs can be written for complex aggregator implementations --- .../theta/SketchAggregationTest.java | 2 +- .../oldapi/OldApiSketchAggregationTest.java | 2 +- .../ApproximateHistogramAggregationTest.java | 5 +- .../druid/query/MultiValuedDimensionTest.java | 2 +- .../aggregation/AggregationTestHelper.java | 129 +++++++++++++++--- .../HyperUniquesAggregationTest.java | 5 +- 6 files changed, 122 insertions(+), 23 deletions(-) diff --git a/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index 35549e0e80b..f2eb1ce87ed 100644 --- a/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -56,7 +56,7 @@ public class SketchAggregationTest { SketchModule sm = new SketchModule(); sm.configure(null); - helper = new AggregationTestHelper(sm.getJacksonModules(), tempFolder); + helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(sm.getJacksonModules(), tempFolder); } @Test diff --git a/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java b/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java index ad7d1937231..b7bad2c527d 100644 --- a/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java +++ b/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java @@ -55,7 +55,7 @@ public class OldApiSketchAggregationTest OldApiSketchModule sm = new OldApiSketchModule(); sm.configure(null); - helper = new AggregationTestHelper( + helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( sm.getJacksonModules(), tempFolder ); diff --git a/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java b/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java index 9168ca31541..27b52c18d24 100644 --- a/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java +++ b/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java @@ -43,7 +43,10 @@ public class ApproximateHistogramAggregationTest { ApproximateHistogramDruidModule module = new ApproximateHistogramDruidModule(); module.configure(null); - helper = new AggregationTestHelper(Lists.newArrayList(module.getJacksonModules()), tempFolder); + helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + Lists.newArrayList(module.getJacksonModules()), + tempFolder + ); } @Test diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index 0fefbce30a6..054692025ac 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -72,7 +72,7 @@ public class MultiValuedDimensionTest public MultiValuedDimensionTest() throws Exception { - helper = new AggregationTestHelper( + helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( ImmutableList.of(), null ); } diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index e3adad4f8df..d7b39080971 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -19,9 +19,14 @@ package io.druid.query.aggregation; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -30,6 +35,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.IAE; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -46,13 +52,17 @@ import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; import io.druid.query.QueryWatcher; -import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryRunnerFactory; +import io.druid.query.select.SelectQueryEngine; +import io.druid.query.select.SelectQueryQueryToolChest; +import io.druid.query.select.SelectQueryRunnerFactory; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; @@ -88,17 +98,39 @@ public class AggregationTestHelper private final ObjectMapper mapper; private final IndexMerger indexMerger; private final IndexIO indexIO; - private final GroupByQueryQueryToolChest toolChest; - private final GroupByQueryRunnerFactory factory; + private final QueryToolChest toolChest; + private final QueryRunnerFactory factory; private final TemporaryFolder tempFolder; - public AggregationTestHelper(List jsonModulesToRegister, TemporaryFolder tempFoler) + private AggregationTestHelper( + ObjectMapper mapper, + IndexMerger indexMerger, + IndexIO indexIO, + QueryToolChest toolchest, + QueryRunnerFactory factory, + TemporaryFolder tempFolder, + List jsonModulesToRegister + ) { - this.tempFolder = tempFoler; - mapper = new DefaultObjectMapper(); - indexIO = TestHelper.getTestIndexIO(); - indexMerger = TestHelper.getTestIndexMerger(); + this.mapper = mapper; + this.indexMerger = indexMerger; + this.indexIO = indexIO; + this.toolChest = toolchest; + this.factory = factory; + this.tempFolder = tempFolder; + + for(Module mod : jsonModulesToRegister) { + mapper.registerModule(mod); + } + } + + public static final AggregationTestHelper createGroupByQueryAggregationTestHelper( + List jsonModulesToRegister, + TemporaryFolder tempFolder + ) + { + ObjectMapper mapper = new DefaultObjectMapper(); for(Module mod : jsonModulesToRegister) { mapper.registerModule(mod); @@ -125,17 +157,59 @@ public class AggregationTestHelper }; GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool); - toolChest = new GroupByQueryQueryToolChest( + GroupByQueryQueryToolChest toolchest = new GroupByQueryQueryToolChest( configSupplier, mapper, engine, pool, NoopIntervalChunkingQueryRunnerDecorator() ); - factory = new GroupByQueryRunnerFactory( + GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( engine, noopQueryWatcher, configSupplier, - toolChest, + toolchest, pool ); + + return new AggregationTestHelper( + mapper, + TestHelper.getTestIndexMerger(), + TestHelper.getTestIndexIO(), + toolchest, + factory, + tempFolder, + jsonModulesToRegister + ); + } + + public static final AggregationTestHelper createSelectQueryAggregationTestHelper( + List jsonModulesToRegister, + TemporaryFolder tempFolder + ) + { + ObjectMapper mapper = new DefaultObjectMapper(); + + SelectQueryQueryToolChest toolchest = new SelectQueryQueryToolChest( + new DefaultObjectMapper(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + + SelectQueryRunnerFactory factory = new SelectQueryRunnerFactory( + new SelectQueryQueryToolChest( + new DefaultObjectMapper(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ), + new SelectQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + + return new AggregationTestHelper( + mapper, + TestHelper.getTestIndexMerger(), + TestHelper.getTestIndexIO(), + toolchest, + factory, + tempFolder, + jsonModulesToRegister + ); } public Sequence createIndexAndRunQueryOnSegment( @@ -289,12 +363,12 @@ public class AggregationTestHelper //Simulates running group-by query on individual segments as historicals would do, json serialize the results //from each segment, later deserialize and merge and finally return the results - public Sequence runQueryOnSegments(final List segmentDirs, final String groupByQueryJson) throws Exception + public Sequence runQueryOnSegments(final List segmentDirs, final String queryJson) throws Exception { - return runQueryOnSegments(segmentDirs, mapper.readValue(groupByQueryJson, GroupByQuery.class)); + return runQueryOnSegments(segmentDirs, mapper.readValue(queryJson, Query.class)); } - public Sequence runQueryOnSegments(final List segmentDirs, final GroupByQuery query) + public Sequence runQueryOnSegments(final List segmentDirs, final Query query) { final List segments = Lists.transform( segmentDirs, @@ -322,7 +396,7 @@ public class AggregationTestHelper } } - public Sequence runQueryOnSegmentsObjs(final List segments, final GroupByQuery query) + public Sequence runQueryOnSegmentsObjs(final List segments, final Query query) { final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( toolChest.postMergeQueryDecoration( @@ -385,10 +459,11 @@ public class AggregationTestHelper ); String resultStr = mapper.writer().writeValueAsString(yielder); + TypeFactory typeFactory = mapper.getTypeFactory(); + JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference()); + List resultRows = Lists.transform( - (List)mapper.readValue( - resultStr, new TypeReference>() {} - ), + readQueryResultArrayFromString(resultStr), toolChest.makePreComputeManipulatorFn( query, MetricManipulatorFns.deserializing() @@ -402,6 +477,24 @@ public class AggregationTestHelper }; } + private List readQueryResultArrayFromString(String str) throws Exception + { + List result = new ArrayList(); + + JsonParser jp = mapper.getFactory().createParser(str); + + if (jp.nextToken() != JsonToken.START_ARRAY) { + throw new IAE("not an array [%s]", str); + } + + ObjectCodec objectCodec = jp.getCodec(); + + while(jp.nextToken() != JsonToken.END_ARRAY) { + result.add(objectCodec.readValue(jp, toolChest.getResultTypeReference())); + } + return result; + } + public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() { return new IntervalChunkingQueryRunnerDecorator(null, null, null) { diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java index d700710e3e8..3e33dcc047d 100644 --- a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java @@ -41,7 +41,10 @@ public class HyperUniquesAggregationTest @Test public void testIngestAndQuery() throws Exception { - AggregationTestHelper helper = new AggregationTestHelper(Lists.newArrayList(new AggregatorsModule()), tempFolder); + AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + Lists.newArrayList(new AggregatorsModule()), + tempFolder + ); String metricSpec = "[{" + "\"type\": \"hyperUnique\"," From 62e5e45da8057318bc401b5c73445b2eae24c8b1 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 5 Jan 2016 21:53:44 -0600 Subject: [PATCH 2/3] add select query UT for thetaSketch --- .../theta/SketchAggregationTest.java | 33 +++++++++++++++++-- .../src/test/resources/select_query.json | 11 +++++++ 2 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 extensions/datasketches/src/test/resources/select_query.json diff --git a/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index f2eb1ce87ed..9fcb46e63ca 100644 --- a/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -20,6 +20,7 @@ package io.druid.query.aggregation.datasketches.theta; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Files; import com.metamx.common.guava.Sequence; @@ -28,10 +29,12 @@ import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketches; import io.druid.data.input.MapBasedRow; import io.druid.granularity.QueryGranularity; +import io.druid.query.Result; import io.druid.query.aggregation.AggregationTestHelper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.post.FieldAccessPostAggregator; +import io.druid.query.select.SelectResultValue; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; @@ -60,7 +63,7 @@ public class SketchAggregationTest } @Test - public void testSimpleDataIngestAndQuery() throws Exception + public void testSimpleDataIngestAndGpByQuery() throws Exception { Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()), @@ -92,7 +95,33 @@ public class SketchAggregationTest } @Test - public void testSketchDataIngestAndQuery() throws Exception + public void testSimpleDataIngestAndSelectQuery() throws Exception + { + SketchModule sm = new SketchModule(); + sm.configure(null); + AggregationTestHelper selectQueryAggregationTestHelper = AggregationTestHelper.createSelectQueryAggregationTestHelper( + sm.getJacksonModules(), + tempFolder + ); + + Sequence seq = selectQueryAggregationTestHelper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()), + readFileFromClasspathAsString("simple_test_data_record_parser.json"), + readFileFromClasspathAsString("simple_test_data_aggregators.json"), + 0, + QueryGranularity.NONE, + 5000, + readFileFromClasspathAsString("select_query.json") + ); + + Result result = (Result) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList())); + Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp()); + Assert.assertEquals(100, result.getValue().getEvents().size()); + Assert.assertEquals("AgMDAAAazJMBAAAAAACAPzz9j7pWTMdR", result.getValue().getEvents().get(0).getEvent().get("pty_country")); + } + + @Test + public void testSketchDataIngestAndGpByQuery() throws Exception { Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(SketchAggregationTest.class.getClassLoader().getResource("sketch_test_data.tsv").getFile()), diff --git a/extensions/datasketches/src/test/resources/select_query.json b/extensions/datasketches/src/test/resources/select_query.json new file mode 100644 index 00000000000..f56116b54a8 --- /dev/null +++ b/extensions/datasketches/src/test/resources/select_query.json @@ -0,0 +1,11 @@ +{ + "queryType": "select", + "dataSource": "test_datasource", + "dimensions":[], + "metrics":[], + "granularity": "ALL", + "intervals": [ + "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" + ], + "pagingSpec":{"pagingIdentifiers": {}, "threshold":100} +} From c6634d7c2c6bbb570b697ddcffdc69ca06709d62 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 5 Jan 2016 00:01:19 -0600 Subject: [PATCH 3/3] adding json for thetaSketch Memory object representation --- .../theta/MemoryJsonSerializer.java | 40 +++++++++++++++++++ .../datasketches/theta/SketchModule.java | 7 +++- .../datasketches/theta/SketchOperations.java | 7 +++- 3 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/MemoryJsonSerializer.java diff --git a/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/MemoryJsonSerializer.java b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/MemoryJsonSerializer.java new file mode 100644 index 00000000000..2590c1fc3fb --- /dev/null +++ b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/MemoryJsonSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.theta; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.yahoo.sketches.memory.Memory; + +import java.io.IOException; + +/** + */ +public class MemoryJsonSerializer extends JsonSerializer +{ + @Override + public void serialize(Memory mem, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException + { + jgen.writeBinary(SketchOperations.deserializeFromMemory(mem).toByteArray()); + } +} diff --git a/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java index 1969494963d..84b0853cf22 100644 --- a/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java +++ b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; +import com.yahoo.sketches.memory.Memory; import com.yahoo.sketches.theta.Sketch; import io.druid.initialization.DruidModule; import io.druid.segment.serde.ComplexMetrics; @@ -66,7 +67,11 @@ public class SketchModule implements DruidModule new NamedType(SketchEstimatePostAggregator.class, THETA_SKETCH_ESTIMATE_POST_AGG), new NamedType(SketchSetPostAggregator.class, THETA_SKETCH_SET_OP_POST_AGG) ) - .addSerializer(Sketch.class, new SketchJsonSerializer()) + .addSerializer( + Sketch.class, new SketchJsonSerializer() + ) + .addSerializer( + Memory.class, new MemoryJsonSerializer()) ); } } diff --git a/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java index a40931bb030..b86ef333bc0 100644 --- a/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java +++ b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java @@ -22,6 +22,7 @@ package io.druid.query.aggregation.datasketches.theta; import com.google.common.base.Charsets; import com.metamx.common.logger.Logger; import com.yahoo.sketches.Family; +import com.yahoo.sketches.memory.Memory; import com.yahoo.sketches.memory.NativeMemory; import com.yahoo.sketches.theta.AnotB; import com.yahoo.sketches.theta.Intersection; @@ -72,7 +73,11 @@ public class SketchOperations public static Sketch deserializeFromByteArray(byte[] data) { - NativeMemory mem = new NativeMemory(data); + return deserializeFromMemory(new NativeMemory(data)); + } + + public static Sketch deserializeFromMemory(Memory mem) + { if (Sketch.getSerializationVersion(mem) < 3) { return Sketches.heapifySketch(mem); } else {