Merge pull request #489 from metamx/fix-groupby

Make hyperUniques work for groupBy
This commit is contained in:
xvrl 2014-04-17 14:46:45 -07:00
commit d0e65684d4
2 changed files with 89 additions and 40 deletions

View File

@ -79,17 +79,23 @@ public class HyperUniquesSerde extends ComplexMetricSerde
@Override @Override
public HyperLogLogCollector extractValue(InputRow inputRow, String metricName) public HyperLogLogCollector extractValue(InputRow inputRow, String metricName)
{ {
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); Object rawValue = inputRow.getRaw(metricName);
List<String> dimValues = inputRow.getDimension(metricName); if (rawValue instanceof HyperLogLogCollector) {
if (dimValues == null) { return (HyperLogLogCollector) inputRow.getRaw(metricName);
} else {
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
List<String> dimValues = inputRow.getDimension(metricName);
if (dimValues == null) {
return collector;
}
for (String dimensionValue : dimValues) {
collector.add(hashFn.hashBytes(dimensionValue.getBytes(Charsets.UTF_8)).asBytes());
}
return collector; return collector;
} }
for (String dimensionValue : dimValues) {
collector.add(hashFn.hashBytes(dimensionValue.getBytes(Charsets.UTF_8)).asBytes());
}
return collector;
} }
}; };
} }

View File

@ -35,6 +35,7 @@ import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.PeriodGranularity; import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryRunnerTestHelper;
@ -46,7 +47,6 @@ import io.druid.query.aggregation.MaxAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ExtractionDimensionSpec; import io.druid.query.dimension.ExtractionDimensionSpec;
import io.druid.query.extraction.PartialDimExtractionFn;
import io.druid.query.extraction.RegexDimExtractionFn; import io.druid.query.extraction.RegexDimExtractionFn;
import io.druid.query.filter.JavaScriptDimFilter; import io.druid.query.filter.JavaScriptDimFilter;
import io.druid.query.filter.RegexDimFilter; import io.druid.query.filter.RegexDimFilter;
@ -64,7 +64,6 @@ import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -181,6 +180,36 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test
public void testGroupByWithUniques()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.qualityUniques
)
)
.setGranularity(QueryRunnerTestHelper.allGran)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow(
"2011-04-01",
"rows",
26L,
"uniques",
QueryRunnerTestHelper.UNIQUES_9
)
);
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test @Test
public void testGroupByWithDimExtractionFn() public void testGroupByWithDimExtractionFn()
{ {
@ -188,7 +217,15 @@ public class GroupByQueryRunnerTest
.builder() .builder()
.setDataSource(QueryRunnerTestHelper.dataSource) .setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new ExtractionDimensionSpec("quality", "alias", new RegexDimExtractionFn("(\\w{1})")))) .setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality",
"alias",
new RegexDimExtractionFn("(\\w{1})")
)
)
)
.setAggregatorSpecs( .setAggregatorSpecs(
Arrays.<AggregatorFactory>asList( Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount, QueryRunnerTestHelper.rowsCount,
@ -228,33 +265,33 @@ public class GroupByQueryRunnerTest
DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles"); DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
GroupByQuery query = GroupByQuery.builder() GroupByQuery query = GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.dataSource) .setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00") .setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00")
.setDimensions( .setDimensions(
Lists.newArrayList( Lists.newArrayList(
(DimensionSpec) new DefaultDimensionSpec( (DimensionSpec) new DefaultDimensionSpec(
"quality", "quality",
"alias" "alias"
) )
) )
) )
.setAggregatorSpecs( .setAggregatorSpecs(
Arrays.<AggregatorFactory>asList( Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount, QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory( new LongSumAggregatorFactory(
"idx", "idx",
"index" "index"
) )
) )
) )
.setGranularity( .setGranularity(
new PeriodGranularity( new PeriodGranularity(
new Period("P1D"), new Period("P1D"),
null, null,
tz tz
) )
) )
.build(); .build();
List<Row> expectedResults = Arrays.asList( List<Row> expectedResults = Arrays.asList(
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L),
@ -993,8 +1030,14 @@ public class GroupByQueryRunnerTest
private Iterable<Row> runQuery(GroupByQuery query) private Iterable<Row> runQuery(GroupByQuery query)
{ {
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
Sequence<Row> queryResult = toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)).run(query); QueryToolChest toolChest = factory.getToolchest();
QueryRunner theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
toolChest
);
Sequence<Row> queryResult = theRunner.run(query);
return Sequences.toList(queryResult, Lists.<Row>newArrayList()); return Sequences.toList(queryResult, Lists.<Row>newArrayList());
} }