diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesSerde.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesSerde.java index da7d2fd7aaa..65169cb5855 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesSerde.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesSerde.java @@ -79,17 +79,23 @@ public class HyperUniquesSerde extends ComplexMetricSerde @Override public HyperLogLogCollector extractValue(InputRow inputRow, String metricName) { - HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + Object rawValue = inputRow.getRaw(metricName); - List dimValues = inputRow.getDimension(metricName); - if (dimValues == null) { + if (rawValue instanceof HyperLogLogCollector) { + return (HyperLogLogCollector) inputRow.getRaw(metricName); + } else { + HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + + List dimValues = inputRow.getDimension(metricName); + if (dimValues == null) { + return collector; + } + + for (String dimensionValue : dimValues) { + collector.add(hashFn.hashBytes(dimensionValue.getBytes(Charsets.UTF_8)).asBytes()); + } return collector; } - - for (String dimensionValue : dimValues) { - collector.add(hashFn.hashBytes(dimensionValue.getBytes(Charsets.UTF_8)).asBytes()); - } - return collector; } }; } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index c70492e3340..97e64a0ec0c 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -35,6 +35,7 @@ import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.PeriodGranularity; import io.druid.granularity.QueryGranularity; +import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; @@ -46,7 +47,6 @@ import io.druid.query.aggregation.MaxAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.ExtractionDimensionSpec; -import io.druid.query.extraction.PartialDimExtractionFn; import io.druid.query.extraction.RegexDimExtractionFn; import io.druid.query.filter.JavaScriptDimFilter; import io.druid.query.filter.RegexDimFilter; @@ -64,7 +64,6 @@ import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -181,6 +180,36 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testGroupByWithUniques() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + QueryRunnerTestHelper.qualityUniques + ) + ) + .setGranularity(QueryRunnerTestHelper.allGran) + .build(); + + List expectedResults = Arrays.asList( + createExpectedRow( + "2011-04-01", + "rows", + 26L, + "uniques", + QueryRunnerTestHelper.UNIQUES_9 + ) + ); + + Iterable results = runQuery(query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + @Test public void testGroupByWithDimExtractionFn() { @@ -188,7 +217,15 @@ public class GroupByQueryRunnerTest .builder() .setDataSource(QueryRunnerTestHelper.dataSource) .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) - .setDimensions(Lists.newArrayList(new ExtractionDimensionSpec("quality", "alias", new RegexDimExtractionFn("(\\w{1})")))) + .setDimensions( + Lists.newArrayList( + new ExtractionDimensionSpec( + "quality", + "alias", + new RegexDimExtractionFn("(\\w{1})") + ) + ) + ) .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, @@ -228,33 +265,33 @@ public class GroupByQueryRunnerTest DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles"); GroupByQuery query = GroupByQuery.builder() - .setDataSource(QueryRunnerTestHelper.dataSource) - .setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00") - .setDimensions( - Lists.newArrayList( - (DimensionSpec) new DefaultDimensionSpec( - "quality", - "alias" - ) - ) - ) - .setAggregatorSpecs( - Arrays.asList( - QueryRunnerTestHelper.rowsCount, - new LongSumAggregatorFactory( - "idx", - "index" - ) - ) - ) - .setGranularity( - new PeriodGranularity( - new Period("P1D"), - null, - tz - ) - ) - .build(); + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00") + .setDimensions( + Lists.newArrayList( + (DimensionSpec) new DefaultDimensionSpec( + "quality", + "alias" + ) + ) + ) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ) + ) + ) + .setGranularity( + new PeriodGranularity( + new Period("P1D"), + null, + tz + ) + ) + .build(); List expectedResults = Arrays.asList( createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L), @@ -993,8 +1030,14 @@ public class GroupByQueryRunnerTest private Iterable runQuery(GroupByQuery query) { - QueryToolChest toolChest = factory.getToolchest(); - Sequence queryResult = toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)).run(query); + + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)), + toolChest + ); + + Sequence queryResult = theRunner.run(query); return Sequences.toList(queryResult, Lists.newArrayList()); }