diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchConstantPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchConstantPostAggregator.java new file mode 100644 index 00000000000..510cb78b80b --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchConstantPostAggregator.java @@ -0,0 +1,133 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.theta; + +import java.util.Collections; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.codec.digest.DigestUtils; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.post.PostAggregatorIds; +import io.druid.query.cache.CacheKeyBuilder; + +/** + */ +public class SketchConstantPostAggregator implements PostAggregator +{ + + private final String name; + private final String value; + private final SketchHolder sketchValue; + + @JsonCreator + public SketchConstantPostAggregator(@JsonProperty("name") String name, @JsonProperty("value") String value) + { + this.name = name; + Preconditions.checkArgument(value != null && !value.isEmpty(), + "Constant value cannot be null or empty, expecting base64 encoded sketch string"); + this.value = value; + this.sketchValue = SketchHolder.deserialize(value); + } + + @Override + public Set getDependentFields() + { + return Collections.emptySet(); + } + + @Override + public Comparator getComparator() + { + return SketchHolder.COMPARATOR; + } + + @Override + public Object compute(Map combinedAggregators) + { + return sketchValue; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @Override + public SketchConstantPostAggregator decorate(Map aggregators) + { + return this; + } + + @JsonProperty("value") + public SketchHolder getSketchValue() + { + return sketchValue; + } + + @Override + public String toString() + { + return "SketchConstantPostAggregator{name='" + name + "', value='" + value + "'}"; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SketchConstantPostAggregator that = (SketchConstantPostAggregator) o; + if (!this.sketchValue.equals(that.sketchValue)) { + return false; + } + if (name != null ? !name.equals(that.name) : that.name != null) { + return false; + } + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 37 * result + sketchValue.hashCode(); + return result; + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(PostAggregatorIds.THETA_SKETCH_CONSTANT) + .appendString(DigestUtils.sha1Hex(value)).build(); + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java index cf5f1a3fb13..c273bd4a76a 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java @@ -36,6 +36,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import org.apache.commons.codec.binary.Base64; +import java.util.Arrays; import java.util.Comparator; /** @@ -286,6 +287,11 @@ public class SketchHolder } } + /** + * Ideally make use of Sketch's equals and hashCode methods but which are not value based implementations. + * And yet need value based equals and hashCode implementations for SketchHolder. + * Hence using Arrays.equals() and Arrays.hashCode(). + */ @Override public boolean equals(Object o) { @@ -295,6 +301,12 @@ public class SketchHolder if (o == null || getClass() != o.getClass()) { return false; } - return this.getSketch().equals(((SketchHolder) o).getSketch()); + return Arrays.equals(this.getSketch().toByteArray(), ((SketchHolder) o).getSketch().toByteArray()); + } + + @Override + public int hashCode() + { + return 31 * Arrays.hashCode(this.getSketch().toByteArray()); } } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java index 361dd4a2fbf..18d699ec6c5 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java @@ -38,6 +38,8 @@ public class SketchModule implements DruidModule public static final String THETA_SKETCH_ESTIMATE_POST_AGG = "thetaSketchEstimate"; public static final String THETA_SKETCH_SET_OP_POST_AGG = "thetaSketchSetOp"; + + public static final String THETA_SKETCH_CONSTANT_POST_AGG = "thetaSketchConstant"; @Override public void configure(Binder binder) @@ -63,7 +65,8 @@ public class SketchModule implements DruidModule .registerSubtypes( new NamedType(SketchMergeAggregatorFactory.class, THETA_SKETCH), new NamedType(SketchEstimatePostAggregator.class, THETA_SKETCH_ESTIMATE_POST_AGG), - new NamedType(SketchSetPostAggregator.class, THETA_SKETCH_SET_OP_POST_AGG) + new NamedType(SketchSetPostAggregator.class, THETA_SKETCH_SET_OP_POST_AGG), + new NamedType(SketchConstantPostAggregator.class, THETA_SKETCH_CONSTANT_POST_AGG) ) .addSerializer( SketchHolder.class, new SketchHolderJsonSerializer() diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index 260dc12b350..b5eb3cb0f6c 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -277,6 +277,14 @@ public class SketchAggregationTest 2 ) ); + + assertPostAggregatorSerde( + new SketchEstimatePostAggregator( + "name", + new SketchConstantPostAggregator("name", "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI="), + null + ) + ); } @Test @@ -293,6 +301,18 @@ public class SketchAggregationTest ) ) ); + + assertPostAggregatorSerde( + new SketchSetPostAggregator( + "name", + "INTERSECT", + null, + Lists.newArrayList( + new FieldAccessPostAggregator("name1", "fieldName1"), + new SketchConstantPostAggregator("name2", "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=") + ) + ) + ); } @Test diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java index 870fa9794b0..d38166b4b91 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java @@ -276,6 +276,51 @@ public class SketchAggregationWithSimpleDataTest Assert.assertEquals(100, result.getValue().getEvents().size()); Assert.assertEquals("AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=", result.getValue().getEvents().get(0).getEvent().get("pty_country")); } + + @Test + public void testTopNQueryWithSketchConstant() throws Exception + { + AggregationTestHelper topNQueryAggregationTestHelper = AggregationTestHelper.createTopNQueryAggregationTestHelper( + sm.getJacksonModules(), + tempFolder + ); + + Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments( + ImmutableList.of(s1, s2), + readFileFromClasspathAsString("topn_query_sketch_const.json") + ); + + Result result = (Result) Iterables.getOnlyElement(seq.toList()); + + Assert.assertEquals(DateTimes.of("2014-10-20T00:00:00.000Z"), result.getTimestamp()); + + DimensionAndMetricValueExtractor value1 = Iterables.get(result.getValue().getValue(), 0); + Assert.assertEquals(38.0, value1.getDoubleMetric("sketch_count"), 0.01); + Assert.assertEquals(38.0, value1.getDoubleMetric("sketchEstimatePostAgg"), 0.01); + Assert.assertEquals(2.0, value1.getDoubleMetric("sketchEstimatePostAggForSketchConstant"), 0.01); + Assert.assertEquals(39.0, value1.getDoubleMetric("sketchUnionPostAggEstimate"), 0.01); + Assert.assertEquals(1.0, value1.getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01); + Assert.assertEquals(37.0, value1.getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01); + Assert.assertEquals("product_3", value1.getDimensionValue("product")); + + DimensionAndMetricValueExtractor value2 = Iterables.get(result.getValue().getValue(), 1); + Assert.assertEquals(42.0, value2.getDoubleMetric("sketch_count"), 0.01); + Assert.assertEquals(42.0, value2.getDoubleMetric("sketchEstimatePostAgg"), 0.01); + Assert.assertEquals(2.0, value2.getDoubleMetric("sketchEstimatePostAggForSketchConstant"), 0.01); + Assert.assertEquals(42.0, value2.getDoubleMetric("sketchUnionPostAggEstimate"), 0.01); + Assert.assertEquals(2.0, value2.getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01); + Assert.assertEquals(40.0, value2.getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01); + Assert.assertEquals("product_1", value2.getDimensionValue("product")); + + DimensionAndMetricValueExtractor value3 = Iterables.get(result.getValue().getValue(), 2); + Assert.assertEquals(42.0, value3.getDoubleMetric("sketch_count"), 0.01); + Assert.assertEquals(42.0, value3.getDoubleMetric("sketchEstimatePostAgg"), 0.01); + Assert.assertEquals(2.0, value3.getDoubleMetric("sketchEstimatePostAggForSketchConstant"), 0.01); + Assert.assertEquals(42.0, value3.getDoubleMetric("sketchUnionPostAggEstimate"), 0.01); + Assert.assertEquals(2.0, value3.getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01); + Assert.assertEquals(40.0, value3.getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01); + Assert.assertEquals("product_2", value3.getDimensionValue("product")); + } public static final String readFileFromClasspathAsString(String fileName) throws IOException { diff --git a/extensions-core/datasketches/src/test/resources/topn_query_sketch_const.json b/extensions-core/datasketches/src/test/resources/topn_query_sketch_const.json new file mode 100644 index 00000000000..3dc47dca86c --- /dev/null +++ b/extensions-core/datasketches/src/test/resources/topn_query_sketch_const.json @@ -0,0 +1,104 @@ +{ + "queryType": "topN", + "dataSource": "test_datasource", + "granularity":"ALL", + "metric": { + "type": "inverted", + "metric": "sketch_count" + }, + "dimension": "product", + "threshold": 3, + "aggregations": [ + { + "type": "thetaSketch", + "name": "sketch_count", + "fieldName": "pty_country", + "size": 16384 + } + ], + "postAggregations": [ + { + "type": "thetaSketchEstimate", + "name": "sketchEstimatePostAgg", + "field": { + "type": "fieldAccess", + "fieldName": "sketch_count" + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchEstimatePostAggForSketchConstant", + "field": { + "type": "thetaSketchConstant", + "name": "theta_sketch_count", + "value": "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=" + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchIntersectionPostAggEstimate", + "field": { + "type": "thetaSketchSetOp", + "name": "sketchIntersectionPostAgg", + "func": "INTERSECT", + "size": 16384, + "fields": [ + { + "type": "fieldAccess", + "fieldName": "sketch_count" + }, + { + "type": "thetaSketchConstant", + "name": "theta_sketch_count", + "value": "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=" + } + ] + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchAnotBPostAggEstimate", + "field": { + "type": "thetaSketchSetOp", + "name": "sketchAnotBUnionPostAgg", + "func": "NOT", + "size": 16384, + "fields": [ + { + "type": "fieldAccess", + "fieldName": "sketch_count" + }, + { + "type": "thetaSketchConstant", + "name": "theta_sketch_count", + "value": "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=" + } + ] + } + }, + { + "type": "thetaSketchEstimate", + "name": "sketchUnionPostAggEstimate", + "field": { + "type": "thetaSketchSetOp", + "name": "sketchUnionPostAgg", + "func": "UNION", + "size": 16384, + "fields": [ + { + "type": "fieldAccess", + "fieldName": "sketch_count" + }, + { + "type": "thetaSketchConstant", + "name": "theta_sketch_count", + "value": "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=" + } + ] + } + } + ], + "intervals": [ + "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" + ] +} diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java index b406b83a0fe..80e665b9d83 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java @@ -46,8 +46,7 @@ public class ConstantPostAggregator implements PostAggregator ) { this.name = name; - this.constantValue = constantValue; - Preconditions.checkNotNull(this.constantValue); + this.constantValue = Preconditions.checkNotNull(constantValue, "Constant value cannot be null"); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/post/PostAggregatorIds.java b/processing/src/main/java/io/druid/query/aggregation/post/PostAggregatorIds.java index bfb7d4df108..3ef660c2cbe 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/PostAggregatorIds.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/PostAggregatorIds.java @@ -44,4 +44,5 @@ public class PostAggregatorIds public static final byte FINALIZING_FIELD_ACCESS = 20; public static final byte ZTEST = 21; public static final byte PVALUE_FROM_ZTEST = 22; + public static final byte THETA_SKETCH_CONSTANT = 23; }