Merge pull request #2496 from gianm/hll-finalizing-comparator

Add comparator to HyperUniquesFinalizingPostAggregator.
This commit is contained in:
Fangjin Yang 2016-02-19 13:03:43 -08:00
commit 8a41f03377
4 changed files with 343 additions and 3 deletions

View File

@ -55,7 +55,7 @@ public class Queries
missing.isEmpty(),
"Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName()
);
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), "[%s] already defined");
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), "[%s] already defined", postAgg.getName());
}
}
}

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.hyperloglog;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.PostAggregator;
@ -33,6 +34,15 @@ import java.util.Set;
*/
public class HyperUniqueFinalizingPostAggregator implements PostAggregator
{
private static final Comparator<Double> DOUBLE_COMPARATOR = Ordering.from(new Comparator<Double>()
{
@Override
public int compare(Double lhs, Double rhs)
{
return Double.compare(lhs, rhs);
}
}).nullsFirst();
private final String name;
private final String fieldName;
@ -56,9 +66,9 @@ public class HyperUniqueFinalizingPostAggregator implements PostAggregator
}
@Override
public Comparator getComparator()
public Comparator<Double> getComparator()
{
throw new UnsupportedOperationException();
return DOUBLE_COMPARATOR;
}
@Override

View File

@ -31,6 +31,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.granularity.PeriodGranularity;
@ -90,7 +91,9 @@ import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -114,6 +117,9 @@ public class GroupByQueryRunnerTest
private GroupByQueryRunnerFactory factory;
private Supplier<GroupByQueryConfig> configSupplier;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp() throws Exception
{
@ -1650,6 +1656,282 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test
public void testGroupByWithOrderOnHyperUnique()
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.marketDimension
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
QueryRunnerTestHelper.uniqueMetric,
OrderByColumnSpec.Direction.DESCENDING
)
), 3
)
)
.setAggregatorSpecs(
Lists.<AggregatorFactory>newArrayList(
QueryRunnerTestHelper.qualityUniques
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
)
)
)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"spot",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_9
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"upfront",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"total_market",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test
public void testGroupByWithHavingOnHyperUnique()
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.marketDimension
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
QueryRunnerTestHelper.uniqueMetric,
OrderByColumnSpec.Direction.DESCENDING
)
), 3
)
)
.setHavingSpec(
new GreaterThanHavingSpec(
QueryRunnerTestHelper.uniqueMetric,
8
)
)
.setAggregatorSpecs(
Lists.<AggregatorFactory>newArrayList(
QueryRunnerTestHelper.qualityUniques
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
)
)
)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"spot",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_9
)
);
// havingSpec equalTo/greaterThan/lessThan do not work on complex aggregators, even if they could be finalized.
// See also: https://github.com/druid-io/druid/issues/2507
expectedException.expect(ParseException.class);
expectedException.expectMessage("Unknown type[class io.druid.query.aggregation.hyperloglog.HLLCV1]");
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test
public void testGroupByWithHavingOnFinalizedHyperUnique()
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.marketDimension
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
OrderByColumnSpec.Direction.DESCENDING
)
), 3
)
)
.setHavingSpec(
new GreaterThanHavingSpec(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
8
)
)
.setAggregatorSpecs(
Lists.<AggregatorFactory>newArrayList(
QueryRunnerTestHelper.qualityUniques
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
)
)
)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"spot",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_9
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test
public void testGroupByWithLimitOnFinalizedHyperUnique()
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.marketDimension
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
OrderByColumnSpec.Direction.DESCENDING
)
), 3
)
)
.setAggregatorSpecs(
Lists.<AggregatorFactory>newArrayList(
QueryRunnerTestHelper.qualityUniques
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
)
)
)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"spot",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_9
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"upfront",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"market",
"total_market",
QueryRunnerTestHelper.uniqueMetric,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.UNIQUES_2
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test
public void testGroupByWithAlphaNumericDimensionOrder()
{

View File

@ -46,6 +46,7 @@ import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.ExtractionDimensionSpec;
import io.druid.query.extraction.DimExtractionFn;
@ -394,6 +395,53 @@ public class TopNQueryRunnerTest
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopNOverHyperUniqueFinalizingPostAggregator()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.marketDimension)
.metric(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric)
.threshold(3)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Arrays.<AggregatorFactory>asList(QueryRunnerTestHelper.qualityUniques)
)
.postAggregators(
Arrays.<PostAggregator>asList(new HyperUniqueFinalizingPostAggregator(
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
QueryRunnerTestHelper.uniqueMetric
))
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put(QueryRunnerTestHelper.uniqueMetric, QueryRunnerTestHelper.UNIQUES_9)
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, QueryRunnerTestHelper.UNIQUES_9)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put(QueryRunnerTestHelper.uniqueMetric, QueryRunnerTestHelper.UNIQUES_2)
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, QueryRunnerTestHelper.UNIQUES_2)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put(QueryRunnerTestHelper.uniqueMetric, QueryRunnerTestHelper.UNIQUES_2)
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, QueryRunnerTestHelper.UNIQUES_2)
.build()
)
)
)
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopNBySegment()