Add comparator to HyperUniquesFinalizingPostAggregator.

This makes it possible to do groupBys with clauses like "HAVING uniques > 10".
Beforehand you couldn't do it with either an aggregator (because it returns
an HLLV1 which the havingSpec can't understand) or a finalized postaggregator
(because it didn't have a comparator).

Now you can at least do it with a finalizing postaggregator. Trying it with
the aggregator alone still doesn't work.

Added some topN and groupBy tests verifying the comparator, and added an
@Ignore test that should pass if havingSpecs are made work on the aggregator
directly.
This commit is contained in:
Gian Merlino 2016-02-18 11:45:09 -08:00
parent 1c4cfd5829
commit d25c46cb9f
4 changed files with 343 additions and 3 deletions

View File

@ -55,7 +55,7 @@ public class Queries
missing.isEmpty(), missing.isEmpty(),
"Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName() "Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName()
); );
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), "[%s] already defined"); Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), "[%s] already defined", postAgg.getName());
} }
} }
} }

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.hyperloglog;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
@ -33,6 +34,15 @@ import java.util.Set;
*/ */
public class HyperUniqueFinalizingPostAggregator implements PostAggregator public class HyperUniqueFinalizingPostAggregator implements PostAggregator
{ {
private static final Comparator<Double> DOUBLE_COMPARATOR = Ordering.from(new Comparator<Double>()
{
@Override
public int compare(Double lhs, Double rhs)
{
return Double.compare(lhs, rhs);
}
}).nullsFirst();
private final String name; private final String name;
private final String fieldName; private final String fieldName;
@ -56,9 +66,9 @@ public class HyperUniqueFinalizingPostAggregator implements PostAggregator
} }
@Override @Override
public Comparator getComparator() public Comparator<Double> getComparator()
{ {
throw new UnsupportedOperationException(); return DOUBLE_COMPARATOR;
} }
@Override @Override

View File

@ -31,6 +31,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool; import io.druid.collections.StupidPool;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.PeriodGranularity; import io.druid.granularity.PeriodGranularity;
@ -90,7 +91,9 @@ import org.joda.time.Period;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -114,6 +117,9 @@ public class GroupByQueryRunnerTest
private GroupByQueryRunnerFactory factory; private GroupByQueryRunnerFactory factory;
private Supplier<GroupByQueryConfig> configSupplier; private Supplier<GroupByQueryConfig> configSupplier;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
@ -1650,6 +1656,282 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
} }
@Test
public void testGroupByWithOrderOnHyperUnique()
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.marketDimension
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
QueryRunnerTestHelper.uniqueMetric,
OrderByColumnSpec.Direction.DESCENDING
)
), 3
)
)
.setAggregatorSpecs(
Lists.<AggregatorFactory>newArrayList(
QueryRunnerTestHelper.qualityUniques
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
)
)
)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"spot",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_9
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"upfront",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"total_market",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test
public void testGroupByWithHavingOnHyperUnique()
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.marketDimension
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
QueryRunnerTestHelper.uniqueMetric,
OrderByColumnSpec.Direction.DESCENDING
)
), 3
)
)
.setHavingSpec(
new GreaterThanHavingSpec(
QueryRunnerTestHelper.uniqueMetric,
8
)
)
.setAggregatorSpecs(
Lists.<AggregatorFactory>newArrayList(
QueryRunnerTestHelper.qualityUniques
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
)
)
)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"spot",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_9
)
);
// havingSpec equalTo/greaterThan/lessThan do not work on complex aggregators, even if they could be finalized.
// See also: https://github.com/druid-io/druid/issues/2507
expectedException.expect(ParseException.class);
expectedException.expectMessage("Unknown type[class io.druid.query.aggregation.hyperloglog.HLLCV1]");
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test
public void testGroupByWithHavingOnFinalizedHyperUnique()
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.marketDimension
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
OrderByColumnSpec.Direction.DESCENDING
)
), 3
)
)
.setHavingSpec(
new GreaterThanHavingSpec(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
8
)
)
.setAggregatorSpecs(
Lists.<AggregatorFactory>newArrayList(
QueryRunnerTestHelper.qualityUniques
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
)
)
)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"spot",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_9
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test
public void testGroupByWithLimitOnFinalizedHyperUnique()
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.marketDimension
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
OrderByColumnSpec.Direction.DESCENDING
)
), 3
)
)
.setAggregatorSpecs(
Lists.<AggregatorFactory>newArrayList(
QueryRunnerTestHelper.qualityUniques
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
)
)
)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"spot",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_9
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"upfront",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"total_market",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test @Test
public void testGroupByWithAlphaNumericDimensionOrder() public void testGroupByWithAlphaNumericDimensionOrder()
{ {

View File

@ -46,6 +46,7 @@ import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.ExtractionDimensionSpec; import io.druid.query.dimension.ExtractionDimensionSpec;
import io.druid.query.extraction.DimExtractionFn; import io.druid.query.extraction.DimExtractionFn;
@ -394,6 +395,53 @@ public class TopNQueryRunnerTest
assertExpectedResults(expectedResults, query); assertExpectedResults(expectedResults, query);
} }
@Test
public void testTopNOverHyperUniqueFinalizingPostAggregator()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.marketDimension)
.metric(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric)
.threshold(3)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Arrays.<AggregatorFactory>asList(QueryRunnerTestHelper.qualityUniques)
)
.postAggregators(
Arrays.<PostAggregator>asList(new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
))
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put(QueryRunnerTestHelper.uniqueMetric, QueryRunnerTestHelper.UNIQUES_9)
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, QueryRunnerTestHelper.UNIQUES_9)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put(QueryRunnerTestHelper.uniqueMetric, QueryRunnerTestHelper.UNIQUES_2)
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, QueryRunnerTestHelper.UNIQUES_2)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put(QueryRunnerTestHelper.uniqueMetric, QueryRunnerTestHelper.UNIQUES_2)
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, QueryRunnerTestHelper.UNIQUES_2)
.build()
)
)
)
);
assertExpectedResults(expectedResults, query);
}
@Test @Test
public void testTopNBySegment() public void testTopNBySegment()