Merge pull request #2710 from himanshug/fix_sketch_estimate_comparator

fix SketchEstimate post aggregator's getComparator() and test changes to verify same
This commit is contained in:
Fangjin Yang 2016-03-24 13:34:32 -07:00
commit d4a96843f9
4 changed files with 2430 additions and 2290 deletions

View File

@ -22,7 +22,9 @@ package io.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
@ -58,9 +60,23 @@ public class SketchEstimatePostAggregator implements PostAggregator
} }
@Override @Override
public Comparator<Sketch> getComparator() public Comparator getComparator()
{ {
return SketchAggregatorFactory.COMPARATOR; if (errorBoundsStdDev == null) {
return Ordering.natural();
} else {
return new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
return Doubles.compare(
((SketchEstimateWithErrorBounds) o1).getEstimate(),
((SketchEstimateWithErrorBounds) o2).getEstimate()
);
}
};
}
} }
@Override @Override

View File

@ -19,6 +19,7 @@
package io.druid.query.aggregation.datasketches.theta; package io.druid.query.aggregation.datasketches.theta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -77,21 +78,76 @@ public class SketchAggregationTest
); );
List<Row> results = Sequences.toList(seq, Lists.<Row>newArrayList()); List<Row> results = Sequences.toList(seq, Lists.<Row>newArrayList());
Assert.assertEquals(1, results.size()); Assert.assertEquals(5, results.size());
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(
new MapBasedRow( new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"), DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap ImmutableMap
.<String, Object>builder() .<String, Object>builder()
.put("sketch_count", 50.0) .put("product", "product_3")
.put("sketchEstimatePostAgg", 50.0) .put("sketch_count", 38.0)
.put("sketchUnionPostAggEstimate", 50.0) .put("sketchEstimatePostAgg", 38.0)
.put("sketchIntersectionPostAggEstimate", 50.0) .put("sketchUnionPostAggEstimate", 38.0)
.put("sketchIntersectionPostAggEstimate", 38.0)
.put("sketchAnotBPostAggEstimate", 0.0) .put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0) .put("non_existing_col_validation", 0.0)
.build() .build()
), ),
results.get(0) new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_1")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_2")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_4")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_5")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
)
),
results
); );
} }
@ -118,7 +174,7 @@ public class SketchAggregationTest
Result<SelectResultValue> result = (Result<SelectResultValue>) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList())); Result<SelectResultValue> result = (Result<SelectResultValue>) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList()));
Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp()); Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp());
Assert.assertEquals(100, result.getValue().getEvents().size()); Assert.assertEquals(100, result.getValue().getEvents().size());
Assert.assertEquals("AgMDAAAazJMBAAAAAACAPzz9j7pWTMdR", result.getValue().getEvents().get(0).getEvent().get("pty_country")); Assert.assertEquals("AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=", result.getValue().getEvents().get(0).getEvent().get("pty_country"));
} }
@Test @Test
@ -176,21 +232,76 @@ public class SketchAggregationTest
); );
List<Row> results = Sequences.toList(seq, Lists.<Row>newArrayList()); List<Row> results = Sequences.toList(seq, Lists.<Row>newArrayList());
Assert.assertEquals(1, results.size()); Assert.assertEquals(5, results.size());
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(
new MapBasedRow( new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"), DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap ImmutableMap
.<String, Object>builder() .<String, Object>builder()
.put("sketch_count", 50.0) .put("product", "product_3")
.put("sketchEstimatePostAgg", 50.0) .put("sketch_count", 38.0)
.put("sketchUnionPostAggEstimate", 50.0) .put("sketchEstimatePostAgg", 38.0)
.put("sketchIntersectionPostAggEstimate", 50.0) .put("sketchUnionPostAggEstimate", 38.0)
.put("sketchIntersectionPostAggEstimate", 38.0)
.put("sketchAnotBPostAggEstimate", 0.0) .put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0) .put("non_existing_col_validation", 0.0)
.build() .build()
), ),
results.get(0) new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_1")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_2")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_4")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_5")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
)
),
results
); );
} }

View File

@ -2,7 +2,7 @@
"queryType": "groupBy", "queryType": "groupBy",
"dataSource": "test_datasource", "dataSource": "test_datasource",
"granularity":"ALL", "granularity":"ALL",
"dimensions": [], "dimensions": ["product"],
"aggregations": [ "aggregations": [
{ {
"type": "thetaSketch", "type": "thetaSketch",
@ -89,5 +89,18 @@
], ],
"intervals": [ "intervals": [
"2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
],
"limitSpec": {
"type": "default",
"columns": [
{
"dimension": "sketchEstimatePostAgg",
"direction": "ASC"
},
{
"dimension": "product",
"direction": "ASC"
}
] ]
} }
}