Adding feature thetaSketchConstant to do some set operation in PostAgg (#5551)

* Adding feature thetaSketchConstant to do some set operation in PostAggregator

* Updated review comments for PR #5551 - Adding thetaSketchConstant

* Fixed CI build issue

* Updated review comments 2 for PR #5551 - Adding thetaSketchConstant
This commit is contained in:
Senthil Kumar L S 2018-04-06 11:26:59 +05:30 committed by Gian Merlino
parent 270fd1ea15
commit 371c672828
8 changed files with 321 additions and 4 deletions

View File

@ -0,0 +1,133 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches.theta;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.codec.digest.DigestUtils;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
/**
*/
public class SketchConstantPostAggregator implements PostAggregator
{
private final String name;
private final String value;
private final SketchHolder sketchValue;
@JsonCreator
public SketchConstantPostAggregator(@JsonProperty("name") String name, @JsonProperty("value") String value)
{
this.name = name;
Preconditions.checkArgument(value != null && !value.isEmpty(),
"Constant value cannot be null or empty, expecting base64 encoded sketch string");
this.value = value;
this.sketchValue = SketchHolder.deserialize(value);
}
@Override
public Set<String> getDependentFields()
{
return Collections.emptySet();
}
@Override
public Comparator<Object> getComparator()
{
return SketchHolder.COMPARATOR;
}
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
return sketchValue;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public SketchConstantPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty("value")
public SketchHolder getSketchValue()
{
return sketchValue;
}
@Override
public String toString()
{
return "SketchConstantPostAggregator{name='" + name + "', value='" + value + "'}";
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SketchConstantPostAggregator that = (SketchConstantPostAggregator) o;
if (!this.sketchValue.equals(that.sketchValue)) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 37 * result + sketchValue.hashCode();
return result;
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(PostAggregatorIds.THETA_SKETCH_CONSTANT)
.appendString(DigestUtils.sha1Hex(value)).build();
}
}

View File

@ -36,6 +36,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import org.apache.commons.codec.binary.Base64;
import java.util.Arrays;
import java.util.Comparator;
/**
@ -286,6 +287,11 @@ public class SketchHolder
}
}
/**
* Ideally make use of Sketch's equals and hashCode methods but which are not value based implementations.
* And yet need value based equals and hashCode implementations for SketchHolder.
* Hence using Arrays.equals() and Arrays.hashCode().
*/
@Override
public boolean equals(Object o)
{
@ -295,6 +301,12 @@ public class SketchHolder
if (o == null || getClass() != o.getClass()) {
return false;
}
return this.getSketch().equals(((SketchHolder) o).getSketch());
return Arrays.equals(this.getSketch().toByteArray(), ((SketchHolder) o).getSketch().toByteArray());
}
@Override
public int hashCode()
{
return 31 * Arrays.hashCode(this.getSketch().toByteArray());
}
}

View File

@ -39,6 +39,8 @@ public class SketchModule implements DruidModule
public static final String THETA_SKETCH_ESTIMATE_POST_AGG = "thetaSketchEstimate";
public static final String THETA_SKETCH_SET_OP_POST_AGG = "thetaSketchSetOp";
public static final String THETA_SKETCH_CONSTANT_POST_AGG = "thetaSketchConstant";
@Override
public void configure(Binder binder)
{
@ -63,7 +65,8 @@ public class SketchModule implements DruidModule
.registerSubtypes(
new NamedType(SketchMergeAggregatorFactory.class, THETA_SKETCH),
new NamedType(SketchEstimatePostAggregator.class, THETA_SKETCH_ESTIMATE_POST_AGG),
new NamedType(SketchSetPostAggregator.class, THETA_SKETCH_SET_OP_POST_AGG)
new NamedType(SketchSetPostAggregator.class, THETA_SKETCH_SET_OP_POST_AGG),
new NamedType(SketchConstantPostAggregator.class, THETA_SKETCH_CONSTANT_POST_AGG)
)
.addSerializer(
SketchHolder.class, new SketchHolderJsonSerializer()

View File

@ -277,6 +277,14 @@ public class SketchAggregationTest
2
)
);
assertPostAggregatorSerde(
new SketchEstimatePostAggregator(
"name",
new SketchConstantPostAggregator("name", "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI="),
null
)
);
}
@Test
@ -293,6 +301,18 @@ public class SketchAggregationTest
)
)
);
assertPostAggregatorSerde(
new SketchSetPostAggregator(
"name",
"INTERSECT",
null,
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator("name1", "fieldName1"),
new SketchConstantPostAggregator("name2", "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=")
)
)
);
}
@Test

View File

@ -277,6 +277,51 @@ public class SketchAggregationWithSimpleDataTest
Assert.assertEquals("AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=", result.getValue().getEvents().get(0).getEvent().get("pty_country"));
}
@Test
public void testTopNQueryWithSketchConstant() throws Exception
{
AggregationTestHelper topNQueryAggregationTestHelper = AggregationTestHelper.createTopNQueryAggregationTestHelper(
sm.getJacksonModules(),
tempFolder
);
Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments(
ImmutableList.of(s1, s2),
readFileFromClasspathAsString("topn_query_sketch_const.json")
);
Result<TopNResultValue> result = (Result<TopNResultValue>) Iterables.getOnlyElement(seq.toList());
Assert.assertEquals(DateTimes.of("2014-10-20T00:00:00.000Z"), result.getTimestamp());
DimensionAndMetricValueExtractor value1 = Iterables.get(result.getValue().getValue(), 0);
Assert.assertEquals(38.0, value1.getDoubleMetric("sketch_count"), 0.01);
Assert.assertEquals(38.0, value1.getDoubleMetric("sketchEstimatePostAgg"), 0.01);
Assert.assertEquals(2.0, value1.getDoubleMetric("sketchEstimatePostAggForSketchConstant"), 0.01);
Assert.assertEquals(39.0, value1.getDoubleMetric("sketchUnionPostAggEstimate"), 0.01);
Assert.assertEquals(1.0, value1.getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01);
Assert.assertEquals(37.0, value1.getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01);
Assert.assertEquals("product_3", value1.getDimensionValue("product"));
DimensionAndMetricValueExtractor value2 = Iterables.get(result.getValue().getValue(), 1);
Assert.assertEquals(42.0, value2.getDoubleMetric("sketch_count"), 0.01);
Assert.assertEquals(42.0, value2.getDoubleMetric("sketchEstimatePostAgg"), 0.01);
Assert.assertEquals(2.0, value2.getDoubleMetric("sketchEstimatePostAggForSketchConstant"), 0.01);
Assert.assertEquals(42.0, value2.getDoubleMetric("sketchUnionPostAggEstimate"), 0.01);
Assert.assertEquals(2.0, value2.getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01);
Assert.assertEquals(40.0, value2.getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01);
Assert.assertEquals("product_1", value2.getDimensionValue("product"));
DimensionAndMetricValueExtractor value3 = Iterables.get(result.getValue().getValue(), 2);
Assert.assertEquals(42.0, value3.getDoubleMetric("sketch_count"), 0.01);
Assert.assertEquals(42.0, value3.getDoubleMetric("sketchEstimatePostAgg"), 0.01);
Assert.assertEquals(2.0, value3.getDoubleMetric("sketchEstimatePostAggForSketchConstant"), 0.01);
Assert.assertEquals(42.0, value3.getDoubleMetric("sketchUnionPostAggEstimate"), 0.01);
Assert.assertEquals(2.0, value3.getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01);
Assert.assertEquals(40.0, value3.getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01);
Assert.assertEquals("product_2", value3.getDimensionValue("product"));
}
public static final String readFileFromClasspathAsString(String fileName) throws IOException
{
return Files.asCharSource(

View File

@ -0,0 +1,104 @@
{
"queryType": "topN",
"dataSource": "test_datasource",
"granularity":"ALL",
"metric": {
"type": "inverted",
"metric": "sketch_count"
},
"dimension": "product",
"threshold": 3,
"aggregations": [
{
"type": "thetaSketch",
"name": "sketch_count",
"fieldName": "pty_country",
"size": 16384
}
],
"postAggregations": [
{
"type": "thetaSketchEstimate",
"name": "sketchEstimatePostAgg",
"field": {
"type": "fieldAccess",
"fieldName": "sketch_count"
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchEstimatePostAggForSketchConstant",
"field": {
"type": "thetaSketchConstant",
"name": "theta_sketch_count",
"value": "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI="
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchIntersectionPostAggEstimate",
"field": {
"type": "thetaSketchSetOp",
"name": "sketchIntersectionPostAgg",
"func": "INTERSECT",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "thetaSketchConstant",
"name": "theta_sketch_count",
"value": "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI="
}
]
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchAnotBPostAggEstimate",
"field": {
"type": "thetaSketchSetOp",
"name": "sketchAnotBUnionPostAgg",
"func": "NOT",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "thetaSketchConstant",
"name": "theta_sketch_count",
"value": "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI="
}
]
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchUnionPostAggEstimate",
"field": {
"type": "thetaSketchSetOp",
"name": "sketchUnionPostAgg",
"func": "UNION",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "thetaSketchConstant",
"name": "theta_sketch_count",
"value": "AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI="
}
]
}
}
],
"intervals": [
"2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
]
}

View File

@ -46,8 +46,7 @@ public class ConstantPostAggregator implements PostAggregator
)
{
this.name = name;
this.constantValue = constantValue;
Preconditions.checkNotNull(this.constantValue);
this.constantValue = Preconditions.checkNotNull(constantValue, "Constant value cannot be null");
}
@Override

View File

@ -44,4 +44,5 @@ public class PostAggregatorIds
public static final byte FINALIZING_FIELD_ACCESS = 20;
public static final byte ZTEST = 21;
public static final byte PVALUE_FROM_ZTEST = 22;
public static final byte THETA_SKETCH_CONSTANT = 23;
}