Merge pull request #698 from metamx/hll-groupby-test

HLL GroupBy fixes + tests
This commit is contained in:
fjy 2014-08-22 15:59:02 -06:00
commit 0d32466cad
3 changed files with 67 additions and 27 deletions

View File

@ -195,6 +195,13 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return applyCorrection(e, zeroCount); return applyCorrection(e, zeroCount);
} }
/**
* Checks if the payload for the given ByteBuffer is sparse or not.
* The given buffer must be positioned at getPayloadBytePosition() prior to calling isSparse
*
* @param buffer
* @return
*/
private static boolean isSparse(ByteBuffer buffer) private static boolean isSparse(ByteBuffer buffer)
{ {
return buffer.remaining() != NUM_BYTES_FOR_BUCKETS; return buffer.remaining() != NUM_BYTES_FOR_BUCKETS;
@ -495,13 +502,32 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return false; return false;
} }
HyperLogLogCollector collector = (HyperLogLogCollector) o; ByteBuffer otherBuffer = ((HyperLogLogCollector) o).storageBuffer;
if (storageBuffer != null ? !storageBuffer.equals(collector.storageBuffer) : collector.storageBuffer != null) { if (storageBuffer != null ? false : otherBuffer != null) {
return false; return false;
} }
return true; if(storageBuffer == null && otherBuffer == null) {
return true;
}
final ByteBuffer denseStorageBuffer;
if(storageBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer);
denseCollector.convertToDenseStorage();
denseStorageBuffer = denseCollector.storageBuffer;
} else {
denseStorageBuffer = storageBuffer;
}
if(otherBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer);
otherCollector.convertToDenseStorage();
otherBuffer = otherCollector.storageBuffer;
}
return denseStorageBuffer.equals(otherBuffer);
} }
@Override @Override

View File

@ -355,7 +355,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
while (aggsIter.hasNext()) { while (aggsIter.hasNext()) {
final AggregatorFactory factory = aggsIter.next(); final AggregatorFactory factory = aggsIter.next();
Object agg = event.remove(factory.getName()); Object agg = event.get(factory.getName());
if (agg != null) { if (agg != null) {
event.put(factory.getName(), factory.deserialize(agg)); event.put(factory.getName(), factory.deserialize(agg));
} }

View File

@ -22,6 +22,7 @@ package io.druid.client;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
@ -31,6 +32,8 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.MergeIterable;
@ -65,6 +68,8 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator; import io.druid.query.aggregation.post.FieldAccessPostAggregator;
@ -806,41 +811,50 @@ public class CachingClusteredClientTest
@Test @Test
public void testGroupByCaching() throws Exception public void testGroupByCaching() throws Exception
{ {
List<AggregatorFactory> aggsWithUniques = ImmutableList.<AggregatorFactory>builder().addAll(AGGS)
.add(new HyperUniquesAggregatorFactory("uniques", "uniques")).build();
final HashFunction hashFn = Hashing.murmur3_128();
GroupByQuery.Builder builder = new GroupByQuery.Builder() GroupByQuery.Builder builder = new GroupByQuery.Builder()
.setDataSource(DATA_SOURCE) .setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(SEG_SPEC) .setQuerySegmentSpec(SEG_SPEC)
.setDimFilter(DIM_FILTER) .setDimFilter(DIM_FILTER)
.setGranularity(GRANULARITY) .setGranularity(GRANULARITY)
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "a"))) .setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "a")))
.setAggregatorSpecs(AGGS) .setAggregatorSpecs(aggsWithUniques)
.setPostAggregatorSpecs(POST_AGGS) .setPostAggregatorSpecs(POST_AGGS)
.setContext(CONTEXT); .setContext(CONTEXT);
final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
collector.add(hashFn.hashString("abc123", Charsets.UTF_8).asBytes());
collector.add(hashFn.hashString("123abc", Charsets.UTF_8).asBytes());
testQueryCaching( testQueryCaching(
client, client,
builder.build(), builder.build(),
new Interval("2011-01-01/2011-01-02"), new Interval("2011-01-01/2011-01-02"),
makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "a", "rows", 1, "imps", 1, "impers", 1)), makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "a", "rows", 1, "imps", 1, "impers", 1, "uniques", collector)),
new Interval("2011-01-02/2011-01-03"), new Interval("2011-01-02/2011-01-03"),
makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "b", "rows", 2, "imps", 2, "impers", 2)), makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "b", "rows", 2, "imps", 2, "impers", 2, "uniques", collector)),
new Interval("2011-01-05/2011-01-10"), new Interval("2011-01-05/2011-01-10"),
makeGroupByResults( makeGroupByResults(
new DateTime("2011-01-05"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3), new DateTime("2011-01-05"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4), new DateTime("2011-01-06"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5), new DateTime("2011-01-07"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6), new DateTime("2011-01-08"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7) new DateTime("2011-01-09"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
), ),
new Interval("2011-01-05/2011-01-10"), new Interval("2011-01-05/2011-01-10"),
makeGroupByResults( makeGroupByResults(
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3), new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4), new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5), new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6), new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7) new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
) )
); );
@ -874,16 +888,16 @@ public class CachingClusteredClientTest
); );
TestHelper.assertExpectedObjects( TestHelper.assertExpectedObjects(
makeGroupByResults( makeGroupByResults(
new DateTime("2011-01-05T"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3), new DateTime("2011-01-05T"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3), new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06T"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4), new DateTime("2011-01-06T"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4), new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07T"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5), new DateTime("2011-01-07T"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5), new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08T"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6), new DateTime("2011-01-08T"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6), new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09T"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7), new DateTime("2011-01-09T"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7) new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
), ),
runner.run( runner.run(
builder.setInterval("2011-01-05/2011-01-10") builder.setInterval("2011-01-05/2011-01-10")