diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 9ceb714a354..a825f487da3 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -21,6 +21,7 @@ package io.druid.query.aggregation.datasketches.theta; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.Ordering; import com.google.common.primitives.Doubles; import com.google.common.primitives.Ints; import com.yahoo.sketches.Family; @@ -30,7 +31,6 @@ import com.yahoo.sketches.theta.SetOperation; import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketches; import com.yahoo.sketches.theta.Union; - import io.druid.java.util.common.IAE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; @@ -52,14 +52,18 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory protected final int size; private final byte cacheId; - public static final Comparator COMPARATOR = new Comparator() - { - @Override - public int compare(Sketch o, Sketch o1) - { - return Doubles.compare(o.getEstimate(), o1.getEstimate()); - } - }; + public static final Comparator COMPARATOR = Ordering.from( + new Comparator() + { + @Override + public int compare(Object o1, Object o2) + { + Sketch s1 = SketchAggregatorFactory.toSketch(o1); + Sketch s2 = SketchAggregatorFactory.toSketch(o2); + return Doubles.compare(s1.getEstimate(), s2.getEstimate()); + } + } + ).nullsFirst(); public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId) { @@ -103,7 +107,7 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory } @Override - public Comparator getComparator() + public Comparator getComparator() { return COMPARATOR; } @@ -191,6 +195,17 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory .array(); } + public final static Sketch toSketch(Object obj) + { + if (obj instanceof Sketch) { + return (Sketch) obj; + } else if (obj instanceof Union) { + return ((Union) obj).getResult(true, null); + } else { + throw new IAE("Can't convert to Sketch object [%s]", obj.getClass()); + } + } + @Override public String toString() { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java index 4c5156c58f4..42e852a0a20 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java @@ -82,7 +82,7 @@ public class SketchEstimatePostAggregator implements PostAggregator @Override public Object compute(Map combinedAggregators) { - Sketch sketch = SketchSetPostAggregator.toSketch(field.compute(combinedAggregators)); + Sketch sketch = SketchAggregatorFactory.toSketch(field.compute(combinedAggregators)); if (errorBoundsStdDev != null) { SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( sketch.getEstimate(), diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java index 122d1a39dfd..c128958c101 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java @@ -123,7 +123,7 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory public Object finalizeComputation(Object object) { if (shouldFinalize) { - Sketch sketch = (Sketch) object; + Sketch sketch = SketchAggregatorFactory.toSketch(object); if (errorBoundsStdDev != null) { SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( sketch.getEstimate(), diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java index 70a5504c464..15932887af5 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java @@ -26,7 +26,6 @@ import com.yahoo.sketches.memory.NativeMemory; import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketches; import com.yahoo.sketches.theta.Union; - import io.druid.java.util.common.IAE; import io.druid.segment.data.ObjectStrategy; @@ -41,13 +40,14 @@ public class SketchObjectStrategy implements ObjectStrategy @Override public int compare(Object s1, Object s2) { - if (s1 instanceof Sketch) { - if (s2 instanceof Sketch) { - return SketchAggregatorFactory.COMPARATOR.compare((Sketch) s1, (Sketch) s2); + if (s1 instanceof Sketch || s1 instanceof Union) { + if (s2 instanceof Sketch || s2 instanceof Union) { + return SketchAggregatorFactory.COMPARATOR.compare(s1, s2); } else { return -1; } } + if (s1 instanceof Memory) { if (s2 instanceof Memory) { Memory s1Mem = (Memory) s1; @@ -66,6 +66,7 @@ public class SketchObjectStrategy implements ObjectStrategy return 1; } } + throw new IAE("Unknwon class[%s], toString[%s]", s1.getClass(), s1); } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java index 533cf965685..095d87c9713 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java @@ -56,6 +56,8 @@ public class SketchOperations return deserializeFromByteArray((byte[]) serializedSketch); } else if (serializedSketch instanceof Sketch) { return (Sketch) serializedSketch; + } else if (serializedSketch instanceof Union) { + return ((Union) serializedSketch).getResult(true, null); } throw new IllegalStateException( diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java index a84f3a32bfe..a4f023cd431 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java @@ -24,8 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Sets; import com.yahoo.sketches.Util; import com.yahoo.sketches.theta.Sketch; -import com.yahoo.sketches.theta.Union; - import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.PostAggregator; @@ -75,7 +73,7 @@ public class SketchSetPostAggregator implements PostAggregator } @Override - public Comparator getComparator() + public Comparator getComparator() { return SketchAggregatorFactory.COMPARATOR; } @@ -85,23 +83,12 @@ public class SketchSetPostAggregator implements PostAggregator { Sketch[] sketches = new Sketch[fields.size()]; for (int i = 0; i < sketches.length; i++) { - sketches[i] = toSketch(fields.get(i).compute(combinedAggregators)); + sketches[i] = SketchAggregatorFactory.toSketch(fields.get(i).compute(combinedAggregators)); } return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches); } - public final static Sketch toSketch(Object obj) - { - if (obj instanceof Sketch) { - return (Sketch) obj; - } else if (obj instanceof Union) { - return ((Union) obj).getResult(true, null); - } else { - throw new IAE("Can't convert to Sketch object [%s]", obj.getClass()); - } - } - @Override @JsonProperty public String getName() diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index 3d027176bee..d6243e47d0b 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -21,22 +21,22 @@ package io.druid.query.aggregation.datasketches.theta; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Files; +import com.yahoo.sketches.Family; +import com.yahoo.sketches.theta.SetOperation; import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketches; +import com.yahoo.sketches.theta.Union; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularities; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.Result; import io.druid.query.aggregation.AggregationTestHelper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.post.FieldAccessPostAggregator; -import io.druid.query.select.SelectResultValue; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; @@ -47,6 +47,7 @@ import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.Comparator; import java.util.List; /** @@ -65,119 +66,6 @@ public class SketchAggregationTest helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(sm.getJacksonModules(), tempFolder); } - @Test - public void testSimpleDataIngestAndGpByQuery() throws Exception - { - Sequence seq = helper.createIndexAndRunQueryOnSegment( - new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()), - readFileFromClasspathAsString("simple_test_data_record_parser.json"), - readFileFromClasspathAsString("simple_test_data_aggregators.json"), - 0, - QueryGranularities.NONE, - 5, - readFileFromClasspathAsString("simple_test_data_group_by_query.json") - ); - - List results = Sequences.toList(seq, Lists.newArrayList()); - Assert.assertEquals(5, results.size()); - Assert.assertEquals( - ImmutableList.of( - new MapBasedRow( - DateTime.parse("2014-10-19T00:00:00.000Z"), - ImmutableMap - .builder() - .put("product", "product_3") - .put("sketch_count", 38.0) - .put("sketchEstimatePostAgg", 38.0) - .put("sketchUnionPostAggEstimate", 38.0) - .put("sketchIntersectionPostAggEstimate", 38.0) - .put("sketchAnotBPostAggEstimate", 0.0) - .put("non_existing_col_validation", 0.0) - .build() - ), - new MapBasedRow( - DateTime.parse("2014-10-19T00:00:00.000Z"), - ImmutableMap - .builder() - .put("product", "product_1") - .put("sketch_count", 42.0) - .put("sketchEstimatePostAgg", 42.0) - .put("sketchUnionPostAggEstimate", 42.0) - .put("sketchIntersectionPostAggEstimate", 42.0) - .put("sketchAnotBPostAggEstimate", 0.0) - .put("non_existing_col_validation", 0.0) - .build() - ), - new MapBasedRow( - DateTime.parse("2014-10-19T00:00:00.000Z"), - ImmutableMap - .builder() - .put("product", "product_2") - .put("sketch_count", 42.0) - .put("sketchEstimatePostAgg", 42.0) - .put("sketchUnionPostAggEstimate", 42.0) - .put("sketchIntersectionPostAggEstimate", 42.0) - .put("sketchAnotBPostAggEstimate", 0.0) - .put("non_existing_col_validation", 0.0) - .build() - ), - new MapBasedRow( - DateTime.parse("2014-10-19T00:00:00.000Z"), - ImmutableMap - .builder() - .put("product", "product_4") - .put("sketch_count", 42.0) - .put("sketchEstimatePostAgg", 42.0) - .put("sketchUnionPostAggEstimate", 42.0) - .put("sketchIntersectionPostAggEstimate", 42.0) - .put("sketchAnotBPostAggEstimate", 0.0) - .put("non_existing_col_validation", 0.0) - .build() - ), - new MapBasedRow( - DateTime.parse("2014-10-19T00:00:00.000Z"), - ImmutableMap - .builder() - .put("product", "product_5") - .put("sketch_count", 42.0) - .put("sketchEstimatePostAgg", 42.0) - .put("sketchUnionPostAggEstimate", 42.0) - .put("sketchIntersectionPostAggEstimate", 42.0) - .put("sketchAnotBPostAggEstimate", 0.0) - .put("non_existing_col_validation", 0.0) - .build() - ) - ), - results - ); - } - - @Test - public void testSimpleDataIngestAndSelectQuery() throws Exception - { - SketchModule sm = new SketchModule(); - sm.configure(null); - AggregationTestHelper selectQueryAggregationTestHelper = AggregationTestHelper.createSelectQueryAggregationTestHelper( - sm.getJacksonModules(), - tempFolder - ); - - Sequence seq = selectQueryAggregationTestHelper.createIndexAndRunQueryOnSegment( - new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()), - readFileFromClasspathAsString("simple_test_data_record_parser.json"), - readFileFromClasspathAsString("simple_test_data_aggregators.json"), - 0, - QueryGranularities.NONE, - 5000, - readFileFromClasspathAsString("select_query.json") - ); - - Result result = (Result) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList())); - Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp()); - Assert.assertEquals(100, result.getValue().getEvents().size()); - Assert.assertEquals("AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=", result.getValue().getEvents().get(0).getEvent().get("pty_country")); - } - @Test public void testSketchDataIngestAndGpByQuery() throws Exception { @@ -453,6 +341,34 @@ public class SketchAggregationTest ); } + @Test + public void testSketchAggregatorFactoryComparator() + { + Comparator comparator = SketchAggregatorFactory.COMPARATOR; + Assert.assertEquals(0, comparator.compare(null, null)); + + Union union1 = (Union) SetOperation.builder().build(1<<4, Family.UNION); + union1.update("a"); + union1.update("b"); + Sketch sketch1 = union1.getResult(); + + Assert.assertEquals(-1, comparator.compare(null, sketch1)); + Assert.assertEquals(1, comparator.compare(sketch1, null)); + + Union union2 = (Union) SetOperation.builder().build(1<<4, Family.UNION); + union2.update("a"); + union2.update("b"); + union2.update("c"); + Sketch sketch2 = union2.getResult(); + + Assert.assertEquals(-1, comparator.compare(sketch1, sketch2)); + Assert.assertEquals(-1, comparator.compare(sketch1, union2)); + Assert.assertEquals(1, comparator.compare(sketch2, sketch1)); + Assert.assertEquals(1, comparator.compare(sketch2, union1)); + Assert.assertEquals(1, comparator.compare(union2, union1)); + Assert.assertEquals(1, comparator.compare(union2, sketch1)); + } + private void assertPostAggregatorSerde(PostAggregator agg) throws Exception { Assert.assertEquals( diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTestWithSimpleData.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTestWithSimpleData.java new file mode 100644 index 00000000000..5b62d76fabf --- /dev/null +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTestWithSimpleData.java @@ -0,0 +1,267 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query.aggregation.datasketches.theta; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.Result; +import io.druid.query.aggregation.AggregationTestHelper; +import io.druid.query.select.SelectResultValue; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.query.topn.DimensionAndMetricValueExtractor; +import io.druid.query.topn.TopNResultValue; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; + +/** + */ +public class SketchAggregationTestWithSimpleData +{ + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + private SketchModule sm; + private File s1; + private File s2; + + @Before + public void setup() throws Exception + { + sm = new SketchModule(); + sm.configure(null); + AggregationTestHelper toolchest = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + sm.getJacksonModules(), + tempFolder + ); + + s1 = tempFolder.newFolder(); + toolchest.createIndex( + new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()), + readFileFromClasspathAsString("simple_test_data_record_parser.json"), + readFileFromClasspathAsString("simple_test_data_aggregators.json"), + s1, + 0, + QueryGranularities.NONE, + 5000 + ); + + s2 = tempFolder.newFolder(); + toolchest.createIndex( + new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()), + readFileFromClasspathAsString("simple_test_data_record_parser.json"), + readFileFromClasspathAsString("simple_test_data_aggregators.json"), + s2, + 0, + QueryGranularities.NONE, + 5000 + ); + } + + + @Test + public void testSimpleDataIngestAndGpByQuery() throws Exception + { + AggregationTestHelper gpByQueryAggregationTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + sm.getJacksonModules(), + tempFolder + ); + + Sequence seq = gpByQueryAggregationTestHelper.runQueryOnSegments( + ImmutableList.of(s1, s2), + readFileFromClasspathAsString("simple_test_data_group_by_query.json") + ); + + List results = Sequences.toList(seq, Lists.newArrayList()); + Assert.assertEquals(5, results.size()); + Assert.assertEquals( + ImmutableList.of( + new MapBasedRow( + DateTime.parse("2014-10-19T00:00:00.000Z"), + ImmutableMap + .builder() + .put("product", "product_3") + .put("sketch_count", 38.0) + .put("sketchEstimatePostAgg", 38.0) + .put("sketchUnionPostAggEstimate", 38.0) + .put("sketchIntersectionPostAggEstimate", 38.0) + .put("sketchAnotBPostAggEstimate", 0.0) + .put("non_existing_col_validation", 0.0) + .build() + ), + new MapBasedRow( + DateTime.parse("2014-10-19T00:00:00.000Z"), + ImmutableMap + .builder() + .put("product", "product_1") + .put("sketch_count", 42.0) + .put("sketchEstimatePostAgg", 42.0) + .put("sketchUnionPostAggEstimate", 42.0) + .put("sketchIntersectionPostAggEstimate", 42.0) + .put("sketchAnotBPostAggEstimate", 0.0) + .put("non_existing_col_validation", 0.0) + .build() + ), + new MapBasedRow( + DateTime.parse("2014-10-19T00:00:00.000Z"), + ImmutableMap + .builder() + .put("product", "product_2") + .put("sketch_count", 42.0) + .put("sketchEstimatePostAgg", 42.0) + .put("sketchUnionPostAggEstimate", 42.0) + .put("sketchIntersectionPostAggEstimate", 42.0) + .put("sketchAnotBPostAggEstimate", 0.0) + .put("non_existing_col_validation", 0.0) + .build() + ), + new MapBasedRow( + DateTime.parse("2014-10-19T00:00:00.000Z"), + ImmutableMap + .builder() + .put("product", "product_4") + .put("sketch_count", 42.0) + .put("sketchEstimatePostAgg", 42.0) + .put("sketchUnionPostAggEstimate", 42.0) + .put("sketchIntersectionPostAggEstimate", 42.0) + .put("sketchAnotBPostAggEstimate", 0.0) + .put("non_existing_col_validation", 0.0) + .build() + ), + new MapBasedRow( + DateTime.parse("2014-10-19T00:00:00.000Z"), + ImmutableMap + .builder() + .put("product", "product_5") + .put("sketch_count", 42.0) + .put("sketchEstimatePostAgg", 42.0) + .put("sketchUnionPostAggEstimate", 42.0) + .put("sketchIntersectionPostAggEstimate", 42.0) + .put("sketchAnotBPostAggEstimate", 0.0) + .put("non_existing_col_validation", 0.0) + .build() + ) + ), + results + ); + } + + @Test + public void testSimpleDataIngestAndTimeseriesQuery() throws Exception + { + AggregationTestHelper timeseriesQueryAggregationTestHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper( + sm.getJacksonModules(), + tempFolder + ); + + Sequence seq = timeseriesQueryAggregationTestHelper.runQueryOnSegments( + ImmutableList.of(s1, s2), + readFileFromClasspathAsString("timeseries_query.json") + ); + + Result result = (Result) Iterables.getOnlyElement( + Sequences.toList(seq, Lists.newArrayList()) + ); + + Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp()); + + Assert.assertEquals(50.0, result.getValue().getDoubleMetric("sketch_count"), 0.01); + Assert.assertEquals(50.0, result.getValue().getDoubleMetric("sketchEstimatePostAgg"), 0.01); + Assert.assertEquals(50.0, result.getValue().getDoubleMetric("sketchUnionPostAggEstimate"), 0.01); + Assert.assertEquals(50.0, result.getValue().getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01); + Assert.assertEquals(0.0, result.getValue().getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01); + Assert.assertEquals(0.0, result.getValue().getDoubleMetric("non_existing_col_validation"), 0.01); + } + + + @Test + public void testSimpleDataIngestAndTopNQuery() throws Exception + { + AggregationTestHelper topNQueryAggregationTestHelper = AggregationTestHelper.createTopNQueryAggregationTestHelper( + sm.getJacksonModules(), + tempFolder + ); + + Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments( + ImmutableList.of(s1, s2), + readFileFromClasspathAsString("topn_query.json") + ); + + Result result = (Result) Iterables.getOnlyElement( + Sequences.toList(seq, Lists.newArrayList()) + ); + + Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp()); + + DimensionAndMetricValueExtractor value = Iterables.getOnlyElement(result.getValue().getValue()); + Assert.assertEquals(38.0, value.getDoubleMetric("sketch_count"), 0.01); + Assert.assertEquals(38.0, value.getDoubleMetric("sketchEstimatePostAgg"), 0.01); + Assert.assertEquals(38.0, value.getDoubleMetric("sketchUnionPostAggEstimate"), 0.01); + Assert.assertEquals(38.0, value.getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01); + Assert.assertEquals(0.0, value.getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01); + Assert.assertEquals(0.0, value.getDoubleMetric("non_existing_col_validation"), 0.01); + Assert.assertEquals("product_3", value.getDimensionValue("product")); + } + + @Test + public void testSimpleDataIngestAndSelectQuery() throws Exception + { + SketchModule sm = new SketchModule(); + sm.configure(null); + AggregationTestHelper selectQueryAggregationTestHelper = AggregationTestHelper.createSelectQueryAggregationTestHelper( + sm.getJacksonModules(), + tempFolder + ); + + Sequence seq = selectQueryAggregationTestHelper.runQueryOnSegments( + ImmutableList.of(s1, s2), + readFileFromClasspathAsString("select_query.json") + ); + + Result result = (Result) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList())); + Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp()); + Assert.assertEquals(100, result.getValue().getEvents().size()); + Assert.assertEquals("AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=", result.getValue().getEvents().get(0).getEvent().get("pty_country")); + } + + public final static String readFileFromClasspathAsString(String fileName) throws IOException + { + return Files.asCharSource( + new File(SketchAggregationTest.class.getClassLoader().getResource(fileName).getFile()), + Charset.forName("UTF-8") + ).read(); + } +} diff --git a/extensions-core/datasketches/src/test/resources/timeseries_query.json b/extensions-core/datasketches/src/test/resources/timeseries_query.json new file mode 100644 index 00000000000..439d7585e9a --- /dev/null +++ b/extensions-core/datasketches/src/test/resources/timeseries_query.json @@ -0,0 +1,92 @@ +{ + "queryType": "timeseries", + "dataSource": "test_datasource", + "granularity":"ALL", + "aggregations": [ + { + "type": "thetaSketch", + "name": "sketch_count", + "fieldName": "pty_country", + "size": 16384 + }, + { + "type": "thetaSketch", + "name": "non_existing_col_validation", + "fieldName": "non_existing_col", + "size": 16384 + } + ], + "postAggregations": [ + { + "type": "thetaSketchEstimate", + "name": "sketchEstimatePostAgg", + "field": { + "type": "fieldAccess", + "fieldName": "sketch_count" + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchIntersectionPostAggEstimate", + "field": { + "type": "thetaSketchSetOp", + "name": "sketchIntersectionPostAgg", + "func": "INTERSECT", + "size": 16384, + "fields": [ + { + "type": "fieldAccess", + "fieldName": "sketch_count" + }, + { + "type": "fieldAccess", + "fieldName": "sketch_count" + } + ] + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchAnotBPostAggEstimate", + "field": { + "type": "thetaSketchSetOp", + "name": "sketchAnotBUnionPostAgg", + "func": "NOT", + "size": 16384, + "fields": [ + { + "type": "fieldAccess", + "fieldName": "sketch_count" + }, + { + "type": "fieldAccess", + "fieldName": "sketch_count" + } + ] + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchUnionPostAggEstimate", + "field": { + "type": "thetaSketchSetOp", + "name": "sketchUnionPostAgg", + "func": "UNION", + "size": 16384, + "fields": [ + { + "type": "fieldAccess", + "fieldName": "sketch_count" + }, + { + "type": "fieldAccess", + "fieldName": "sketch_count" + } + ] + } + } + ], + "intervals": [ + "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" + ] +} diff --git a/extensions-core/datasketches/src/test/resources/topn_query.json b/extensions-core/datasketches/src/test/resources/topn_query.json new file mode 100644 index 00000000000..3701b1f5b29 --- /dev/null +++ b/extensions-core/datasketches/src/test/resources/topn_query.json @@ -0,0 +1,98 @@ +{ + "queryType": "topN", + "dataSource": "test_datasource", + "granularity":"ALL", + "metric": { + "type": "inverted", + "metric": "sketch_count" + }, + "dimension": "product", + "threshold": 1, + "aggregations": [ + { + "type": "thetaSketch", + "name": "sketch_count", + "fieldName": "pty_country", + "size": 16384 + }, + { + "type": "thetaSketch", + "name": "non_existing_col_validation", + "fieldName": "non_existing_col", + "size": 16384 + } + ], + "postAggregations": [ + { + "type": "thetaSketchEstimate", + "name": "sketchEstimatePostAgg", + "field": { + "type": "fieldAccess", + "fieldName": "sketch_count" + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchIntersectionPostAggEstimate", + "field": { + "type": "thetaSketchSetOp", + "name": "sketchIntersectionPostAgg", + "func": "INTERSECT", + "size": 16384, + "fields": [ + { + "type": "fieldAccess", + "fieldName": "sketch_count" + }, + { + "type": "fieldAccess", + "fieldName": "sketch_count" + } + ] + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchAnotBPostAggEstimate", + "field": { + "type": "thetaSketchSetOp", + "name": "sketchAnotBUnionPostAgg", + "func": "NOT", + "size": 16384, + "fields": [ + { + "type": "fieldAccess", + "fieldName": "sketch_count" + }, + { + "type": "fieldAccess", + "fieldName": "sketch_count" + } + ] + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchUnionPostAggEstimate", + "field": { + "type": "thetaSketchSetOp", + "name": "sketchUnionPostAgg", + "func": "UNION", + "size": 16384, + "fields": [ + { + "type": "fieldAccess", + "fieldName": "sketch_count" + }, + { + "type": "fieldAccess", + "fieldName": "sketch_count" + } + ] + } + } + ], + "intervals": [ + "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" + ] +} diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index b3ae5881c9f..489abeb6c20 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -28,10 +28,12 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; +import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.StringInputRowParser; @@ -57,6 +59,12 @@ import io.druid.query.groupby.GroupByQueryRunnerTest; import io.druid.query.select.SelectQueryEngine; import io.druid.query.select.SelectQueryQueryToolChest; import io.druid.query.select.SelectQueryRunnerFactory; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import io.druid.query.topn.TopNQueryConfig; +import io.druid.query.topn.TopNQueryQueryToolChest; +import io.druid.query.topn.TopNQueryRunnerFactory; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; @@ -74,6 +82,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -192,6 +201,96 @@ public class AggregationTestHelper ); } + public static final AggregationTestHelper createTimeseriesQueryAggregationTestHelper( + List jsonModulesToRegister, + TemporaryFolder tempFolder + ) + { + ObjectMapper mapper = new DefaultObjectMapper(); + + TimeseriesQueryQueryToolChest toolchest = new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + toolchest, + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + + IndexIO indexIO = new IndexIO( + mapper, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + + return new AggregationTestHelper( + mapper, + new IndexMerger(mapper, indexIO), + indexIO, + toolchest, + factory, + tempFolder, + jsonModulesToRegister + ); + } + + public static final AggregationTestHelper createTopNQueryAggregationTestHelper( + List jsonModulesToRegister, + TemporaryFolder tempFolder + ) + { + ObjectMapper mapper = new DefaultObjectMapper(); + + TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest( + new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + + TopNQueryRunnerFactory factory = new TopNQueryRunnerFactory( + new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(10*1024*1024); + } + } + ), + toolchest, + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + + IndexIO indexIO = new IndexIO( + mapper, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + + return new AggregationTestHelper( + mapper, + new IndexMerger(mapper, indexIO), + indexIO, + toolchest, + factory, + tempFolder, + jsonModulesToRegister + ); + } + public Sequence createIndexAndRunQueryOnSegment( File inputDataFile, String parserJson,