Merge pull request #420 from metamx/hyperunique

Production tested cardinality estimation using hyperloglog
This commit is contained in:
fjy 2014-03-05 16:57:44 -07:00
commit 6137c374a7
19 changed files with 3324 additions and 310 deletions

View File

@ -82,3 +82,13 @@ All JavaScript functions must return numerical values.
"fnReset" : "function() { return 10; }"
}
```
### Complex aggregators
#### `hyperUnique` aggregator
`hyperUnique` uses [Hyperloglog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to compute the estimated cardinality of a dimension.
```json
{ "type" : "hyperUnique", "name" : <output_name>, "fieldName" : <metric_name> }
```

View File

@ -64,6 +64,31 @@ Example JavaScript aggregator:
"function": "function(delta, total) { return 100 * Math.abs(delta) / total; }"
}
```
### `hyperUniqueCardinality` post-aggregator
The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations.
```json
{ "type" : "hyperUniqueCardinality", "fieldName" : <the name field value of the hyperUnique aggregator>}
```
It can be used in a sample calculation as so:
```json
"aggregations" : [{
{"type" : "count", "name" : "rows"},
{"type" : "hyperUnique", "name" : "unique_users", "fieldName" : "uniques"}
}],
"postAggregations" : {
"type" : "arithmetic",
"name" : "average_users_per_row",
"fn" : "/",
"fields" : [
{ "type" : "hyperUniqueCardinality", "fieldName" : "unique_users" },
{ "type" : "fieldAccess", "name" : "rows", "fieldName" : "rows" }
]
}
```
### Example Usage
@ -98,4 +123,4 @@ The format of the query JSON is as follows:
...
}
```
```

View File

@ -22,6 +22,7 @@ package io.druid.jackson;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.hash.Hashing;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -31,10 +32,14 @@ import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.MaxAggregatorFactory;
import io.druid.query.aggregation.MinAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.aggregation.post.JavaScriptPostAggregator;
import io.druid.segment.serde.ComplexMetrics;
/**
*/
@ -44,28 +49,38 @@ public class AggregatorsModule extends SimpleModule
{
super("AggregatorFactories");
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
}
@JsonTypeInfo(use= JsonTypeInfo.Id.NAME, property="type")
@JsonSubTypes(value={
@JsonSubTypes.Type(name="count", value=CountAggregatorFactory.class),
@JsonSubTypes.Type(name="longSum", value=LongSumAggregatorFactory.class),
@JsonSubTypes.Type(name="doubleSum", value=DoubleSumAggregatorFactory.class),
@JsonSubTypes.Type(name="max", value=MaxAggregatorFactory.class),
@JsonSubTypes.Type(name="min", value=MinAggregatorFactory.class),
@JsonSubTypes.Type(name="javascript", value=JavaScriptAggregatorFactory.class),
@JsonSubTypes.Type(name="histogram", value=HistogramAggregatorFactory.class)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "count", value = CountAggregatorFactory.class),
@JsonSubTypes.Type(name = "longSum", value = LongSumAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleSum", value = DoubleSumAggregatorFactory.class),
@JsonSubTypes.Type(name = "max", value = MaxAggregatorFactory.class),
@JsonSubTypes.Type(name = "min", value = MinAggregatorFactory.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class),
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class)
})
public static interface AggregatorFactoryMixin {}
public static interface AggregatorFactoryMixin
{
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "arithmetic", value = ArithmeticPostAggregator.class),
@JsonSubTypes.Type(name = "fieldAccess", value = FieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "constant", value = ConstantPostAggregator.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class)
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class),
@JsonSubTypes.Type(name = "hyperUniqueCardinality", value = HyperUniqueFinalizingPostAggregator.class)
})
public static interface PostAggregatorMixin {}
public static interface PostAggregatorMixin
{
}
}

View File

@ -0,0 +1,288 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
/**
*/
public class ByteBitLookup
{
public static final byte[] lookup;
static {
lookup = new byte[256];
lookup[0] = 0;
lookup[1] = 1;
lookup[2] = 2;
lookup[3] = 1;
lookup[4] = 3;
lookup[5] = 1;
lookup[6] = 2;
lookup[7] = 1;
lookup[8] = 4;
lookup[9] = 1;
lookup[10] = 2;
lookup[11] = 1;
lookup[12] = 3;
lookup[13] = 1;
lookup[14] = 2;
lookup[15] = 1;
lookup[16] = 5;
lookup[17] = 1;
lookup[18] = 2;
lookup[19] = 1;
lookup[20] = 3;
lookup[21] = 1;
lookup[22] = 2;
lookup[23] = 1;
lookup[24] = 4;
lookup[25] = 1;
lookup[26] = 2;
lookup[27] = 1;
lookup[28] = 3;
lookup[29] = 1;
lookup[30] = 2;
lookup[31] = 1;
lookup[32] = 6;
lookup[33] = 1;
lookup[34] = 2;
lookup[35] = 1;
lookup[36] = 3;
lookup[37] = 1;
lookup[38] = 2;
lookup[39] = 1;
lookup[40] = 4;
lookup[41] = 1;
lookup[42] = 2;
lookup[43] = 1;
lookup[44] = 3;
lookup[45] = 1;
lookup[46] = 2;
lookup[47] = 1;
lookup[48] = 5;
lookup[49] = 1;
lookup[50] = 2;
lookup[51] = 1;
lookup[52] = 3;
lookup[53] = 1;
lookup[54] = 2;
lookup[55] = 1;
lookup[56] = 4;
lookup[57] = 1;
lookup[58] = 2;
lookup[59] = 1;
lookup[60] = 3;
lookup[61] = 1;
lookup[62] = 2;
lookup[63] = 1;
lookup[64] = 7;
lookup[65] = 1;
lookup[66] = 2;
lookup[67] = 1;
lookup[68] = 3;
lookup[69] = 1;
lookup[70] = 2;
lookup[71] = 1;
lookup[72] = 4;
lookup[73] = 1;
lookup[74] = 2;
lookup[75] = 1;
lookup[76] = 3;
lookup[77] = 1;
lookup[78] = 2;
lookup[79] = 1;
lookup[80] = 5;
lookup[81] = 1;
lookup[82] = 2;
lookup[83] = 1;
lookup[84] = 3;
lookup[85] = 1;
lookup[86] = 2;
lookup[87] = 1;
lookup[88] = 4;
lookup[89] = 1;
lookup[90] = 2;
lookup[91] = 1;
lookup[92] = 3;
lookup[93] = 1;
lookup[94] = 2;
lookup[95] = 1;
lookup[96] = 6;
lookup[97] = 1;
lookup[98] = 2;
lookup[99] = 1;
lookup[100] = 3;
lookup[101] = 1;
lookup[102] = 2;
lookup[103] = 1;
lookup[104] = 4;
lookup[105] = 1;
lookup[106] = 2;
lookup[107] = 1;
lookup[108] = 3;
lookup[109] = 1;
lookup[110] = 2;
lookup[111] = 1;
lookup[112] = 5;
lookup[113] = 1;
lookup[114] = 2;
lookup[115] = 1;
lookup[116] = 3;
lookup[117] = 1;
lookup[118] = 2;
lookup[119] = 1;
lookup[120] = 4;
lookup[121] = 1;
lookup[122] = 2;
lookup[123] = 1;
lookup[124] = 3;
lookup[125] = 1;
lookup[126] = 2;
lookup[127] = 1;
lookup[128] = 8;
lookup[129] = 1;
lookup[130] = 2;
lookup[131] = 1;
lookup[132] = 3;
lookup[133] = 1;
lookup[134] = 2;
lookup[135] = 1;
lookup[136] = 4;
lookup[137] = 1;
lookup[138] = 2;
lookup[139] = 1;
lookup[140] = 3;
lookup[141] = 1;
lookup[142] = 2;
lookup[143] = 1;
lookup[144] = 5;
lookup[145] = 1;
lookup[146] = 2;
lookup[147] = 1;
lookup[148] = 3;
lookup[149] = 1;
lookup[150] = 2;
lookup[151] = 1;
lookup[152] = 4;
lookup[153] = 1;
lookup[154] = 2;
lookup[155] = 1;
lookup[156] = 3;
lookup[157] = 1;
lookup[158] = 2;
lookup[159] = 1;
lookup[160] = 6;
lookup[161] = 1;
lookup[162] = 2;
lookup[163] = 1;
lookup[164] = 3;
lookup[165] = 1;
lookup[166] = 2;
lookup[167] = 1;
lookup[168] = 4;
lookup[169] = 1;
lookup[170] = 2;
lookup[171] = 1;
lookup[172] = 3;
lookup[173] = 1;
lookup[174] = 2;
lookup[175] = 1;
lookup[176] = 5;
lookup[177] = 1;
lookup[178] = 2;
lookup[179] = 1;
lookup[180] = 3;
lookup[181] = 1;
lookup[182] = 2;
lookup[183] = 1;
lookup[184] = 4;
lookup[185] = 1;
lookup[186] = 2;
lookup[187] = 1;
lookup[188] = 3;
lookup[189] = 1;
lookup[190] = 2;
lookup[191] = 1;
lookup[192] = 7;
lookup[193] = 1;
lookup[194] = 2;
lookup[195] = 1;
lookup[196] = 3;
lookup[197] = 1;
lookup[198] = 2;
lookup[199] = 1;
lookup[200] = 4;
lookup[201] = 1;
lookup[202] = 2;
lookup[203] = 1;
lookup[204] = 3;
lookup[205] = 1;
lookup[206] = 2;
lookup[207] = 1;
lookup[208] = 5;
lookup[209] = 1;
lookup[210] = 2;
lookup[211] = 1;
lookup[212] = 3;
lookup[213] = 1;
lookup[214] = 2;
lookup[215] = 1;
lookup[216] = 4;
lookup[217] = 1;
lookup[218] = 2;
lookup[219] = 1;
lookup[220] = 3;
lookup[221] = 1;
lookup[222] = 2;
lookup[223] = 1;
lookup[224] = 6;
lookup[225] = 1;
lookup[226] = 2;
lookup[227] = 1;
lookup[228] = 3;
lookup[229] = 1;
lookup[230] = 2;
lookup[231] = 1;
lookup[232] = 4;
lookup[233] = 1;
lookup[234] = 2;
lookup[235] = 1;
lookup[236] = 3;
lookup[237] = 1;
lookup[238] = 2;
lookup[239] = 1;
lookup[240] = 5;
lookup[241] = 1;
lookup[242] = 2;
lookup[243] = 1;
lookup[244] = 3;
lookup[245] = 1;
lookup[246] = 2;
lookup[247] = 1;
lookup[248] = 4;
lookup[249] = 1;
lookup[250] = 2;
lookup[251] = 1;
lookup[252] = 3;
lookup[253] = 1;
lookup[254] = 2;
lookup[255] = 1;
}
}

View File

@ -0,0 +1,152 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import java.nio.ByteBuffer;
/**
*/
@Deprecated
public class HLLCV0 extends HyperLogLogCollector
{
/**
* Header:
* Byte 0: registerOffset
* Byte 1-2: numNonZeroRegisters
*/
public static final int NUM_NON_ZERO_REGISTERS_BYTE = 1;
public static final int HEADER_NUM_BYTES = 3;
public static final int NUM_BYTES_FOR_DENSE_STORAGE = NUM_BYTES_FOR_BUCKETS + HEADER_NUM_BYTES;
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{0, 0, 0}).asReadOnlyBuffer();
public HLLCV0()
{
super(defaultStorageBuffer);
}
public HLLCV0(ByteBuffer buffer)
{
super(buffer);
}
@Override
public byte getVersion()
{
return 0;
}
@Override
public void setVersion(ByteBuffer buffer)
{
}
@Override
public byte getRegisterOffset()
{
return getStorageBuffer().get(getInitPosition());
}
@Override
public void setRegisterOffset(byte registerOffset)
{
getStorageBuffer().put(getInitPosition(), registerOffset);
}
@Override
public void setRegisterOffset(ByteBuffer buffer, byte registerOffset)
{
buffer.put(buffer.position(), registerOffset);
}
@Override
public short getNumNonZeroRegisters()
{
return getStorageBuffer().getShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE);
}
@Override
public void setNumNonZeroRegisters(short numNonZeroRegisters)
{
getStorageBuffer().putShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
}
@Override
public void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters)
{
buffer.putShort(buffer.position() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
}
@Override
public byte getMaxOverflowValue()
{
return 0;
}
@Override
public void setMaxOverflowValue(byte value)
{
}
@Override
public void setMaxOverflowValue(ByteBuffer buffer, byte value)
{
}
@Override
public short getMaxOverflowRegister()
{
return 0;
}
@Override
public void setMaxOverflowRegister(short register)
{
}
@Override
public void setMaxOverflowRegister(ByteBuffer buffer, short register)
{
}
@Override
public int getNumHeaderBytes()
{
return HEADER_NUM_BYTES;
}
@Override
public int getNumBytesForDenseStorage()
{
return NUM_BYTES_FOR_DENSE_STORAGE;
}
@Override
public int getPayloadBytePosition()
{
return getInitPosition() + HEADER_NUM_BYTES;
}
@Override
public int getPayloadBytePosition(ByteBuffer buffer)
{
return buffer.position() + HEADER_NUM_BYTES;
}
}

View File

@ -0,0 +1,164 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import java.nio.ByteBuffer;
/**
*/
public class HLLCV1 extends HyperLogLogCollector
{
/**
* Header:
* Byte 0: version
* Byte 1: registerOffset
* Byte 2-3: numNonZeroRegisters
* Byte 4: maxOverflowValue
* Byte 5-6: maxOverflowRegister
*/
public static final byte VERSION = 0x1;
public static final int REGISTER_OFFSET_BYTE = 1;
public static final int NUM_NON_ZERO_REGISTERS_BYTE = 2;
public static final int MAX_OVERFLOW_VALUE_BYTE = 4;
public static final int MAX_OVERFLOW_REGISTER_BYTE = 5;
public static final int HEADER_NUM_BYTES = 7;
public static final int NUM_BYTES_FOR_DENSE_STORAGE = NUM_BYTES_FOR_BUCKETS + HEADER_NUM_BYTES;
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{VERSION, 0, 0, 0, 0, 0, 0})
.asReadOnlyBuffer();
public HLLCV1()
{
super(defaultStorageBuffer);
}
public HLLCV1(ByteBuffer buffer)
{
super(buffer);
}
@Override
public byte getVersion()
{
return VERSION;
}
@Override
public void setVersion(ByteBuffer buffer)
{
buffer.put(buffer.position(), VERSION);
}
@Override
public byte getRegisterOffset()
{
return getStorageBuffer().get(getInitPosition() + REGISTER_OFFSET_BYTE);
}
@Override
public void setRegisterOffset(byte registerOffset)
{
getStorageBuffer().put(getInitPosition() + REGISTER_OFFSET_BYTE, registerOffset);
}
@Override
public void setRegisterOffset(ByteBuffer buffer, byte registerOffset)
{
buffer.put(buffer.position() + REGISTER_OFFSET_BYTE, registerOffset);
}
@Override
public short getNumNonZeroRegisters()
{
return getStorageBuffer().getShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE);
}
@Override
public void setNumNonZeroRegisters(short numNonZeroRegisters)
{
getStorageBuffer().putShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
}
@Override
public void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters)
{
buffer.putShort(buffer.position() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
}
@Override
public byte getMaxOverflowValue()
{
return getStorageBuffer().get(getInitPosition() + MAX_OVERFLOW_VALUE_BYTE);
}
@Override
public void setMaxOverflowValue(byte value)
{
getStorageBuffer().put(getInitPosition() + MAX_OVERFLOW_VALUE_BYTE, value);
}
@Override
public void setMaxOverflowValue(ByteBuffer buffer, byte value)
{
buffer.put(buffer.position() + MAX_OVERFLOW_VALUE_BYTE, value);
}
@Override
public short getMaxOverflowRegister()
{
return getStorageBuffer().getShort(getInitPosition() + MAX_OVERFLOW_REGISTER_BYTE);
}
@Override
public void setMaxOverflowRegister(short register)
{
getStorageBuffer().putShort(getInitPosition() + MAX_OVERFLOW_REGISTER_BYTE, register);
}
@Override
public void setMaxOverflowRegister(ByteBuffer buffer, short register)
{
buffer.putShort(buffer.position() + MAX_OVERFLOW_REGISTER_BYTE, register);
}
@Override
public int getNumHeaderBytes()
{
return HEADER_NUM_BYTES;
}
@Override
public int getNumBytesForDenseStorage()
{
return NUM_BYTES_FOR_DENSE_STORAGE;
}
@Override
public int getPayloadBytePosition()
{
return getInitPosition() + HEADER_NUM_BYTES;
}
@Override
public int getPayloadBytePosition(ByteBuffer buffer)
{
return buffer.position() + HEADER_NUM_BYTES;
}
}

View File

@ -0,0 +1,673 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.primitives.UnsignedBytes;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import java.nio.ByteBuffer;
/**
* Implements the HyperLogLog cardinality estimator described in:
* <p/>
* http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
* <p/>
* Run this code to see a simple indication of expected errors based on different m values:
* <p/>
* for (int i = 1; i < 20; ++i) {
* System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i));
* }
* <p/>
* This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that
* only one thread is ever calling methods on it.
* <p/>
* If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior
*/
public abstract class HyperLogLogCollector implements Comparable<HyperLogLogCollector>
{
public static final int DENSE_THRESHOLD = 128;
public static final int BITS_FOR_BUCKETS = 11;
public static final int NUM_BUCKETS = 1 << BITS_FOR_BUCKETS;
public static final int NUM_BYTES_FOR_BUCKETS = NUM_BUCKETS / 2;
private static final double TWO_TO_THE_SIXTY_FOUR = Math.pow(2, 64);
private static final double ALPHA = 0.7213 / (1 + 1.079 / NUM_BUCKETS);
public static final double LOW_CORRECTION_THRESHOLD = (5 * NUM_BUCKETS) / 2.0d;
public static final double HIGH_CORRECTION_THRESHOLD = TWO_TO_THE_SIXTY_FOUR / 30.0d;
public static final double CORRECTION_PARAMETER = ALPHA * NUM_BUCKETS * NUM_BUCKETS;
private static final Logger log = new Logger(HyperLogLogCollector.class);
private static final int bucketMask = 0x7ff;
private static final int minBytesRequired = 10;
private static final int bitsPerBucket = 4;
private static final int range = (int) Math.pow(2, bitsPerBucket) - 1;
private final static double[][] minNumRegisterLookup = new double[64][256];
static {
for (int registerOffset = 0; registerOffset < 64; ++registerOffset) {
for (int register = 0; register < 256; ++register) {
final int upper = ((register & 0xf0) >> 4) + registerOffset;
final int lower = (register & 0x0f) + registerOffset;
minNumRegisterLookup[registerOffset][register] = 1.0d / Math.pow(2, upper) + 1.0d / Math.pow(2, lower);
}
}
}
// we have to keep track of the number of zeroes in each of the two halves of the byte register (0, 1, or 2)
private final static int[] numZeroLookup = new int[256];
static {
for (int i = 0; i < numZeroLookup.length; ++i) {
numZeroLookup[i] = (((i & 0xf0) == 0) ? 1 : 0) + (((i & 0x0f) == 0) ? 1 : 0);
}
}
// Methods to build the latest HLLC
public static HyperLogLogCollector makeLatestCollector()
{
return new HLLCV1();
}
public static HyperLogLogCollector makeCollector(ByteBuffer buffer)
{
int remaining = buffer.remaining();
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
}
public static int getLatestNumBytesForDenseStorage()
{
return HLLCV1.NUM_BYTES_FOR_DENSE_STORAGE;
}
public static byte[] makeEmptyVersionedByteArray()
{
byte[] arr = new byte[getLatestNumBytesForDenseStorage()];
arr[0] = HLLCV1.VERSION;
return arr;
}
public static double applyCorrection(double e, int zeroCount)
{
e = CORRECTION_PARAMETER / e;
if (e <= LOW_CORRECTION_THRESHOLD) {
return zeroCount == 0 ? e : NUM_BUCKETS * Math.log(NUM_BUCKETS / (double) zeroCount);
}
if (e > HIGH_CORRECTION_THRESHOLD) {
final double ratio = e / TWO_TO_THE_SIXTY_FOUR;
if (ratio >= 1) {
// handle very unlikely case that value is > 2^64
return Double.MAX_VALUE;
} else {
return -TWO_TO_THE_SIXTY_FOUR * Math.log(1 - ratio);
}
}
return e;
}
private static double estimateSparse(
final ByteBuffer buf,
final byte minNum,
final byte overflowValue,
final short overflowPosition,
final boolean isUpperNibble
)
{
final ByteBuffer copy = buf.asReadOnlyBuffer();
double e = 0.0d;
int zeroCount = NUM_BUCKETS - 2 * (buf.remaining() / 3);
while (copy.hasRemaining()) {
short position = copy.getShort();
final int register = (int) copy.get() & 0xff;
if (overflowValue != 0 && position == overflowPosition) {
int upperNibble = ((register & 0xf0) >>> bitsPerBucket) + minNum;
int lowerNibble = (register & 0x0f) + minNum;
if (isUpperNibble) {
upperNibble = Math.max(upperNibble, overflowValue);
} else {
lowerNibble = Math.max(lowerNibble, overflowValue);
}
e += 1.0d / Math.pow(2, upperNibble) + 1.0d / Math.pow(2, lowerNibble);
zeroCount += (((upperNibble & 0xf0) == 0) ? 1 : 0) + (((lowerNibble & 0x0f) == 0) ? 1 : 0);
} else {
e += minNumRegisterLookup[minNum][register];
zeroCount += numZeroLookup[register];
}
}
e += zeroCount;
return applyCorrection(e, zeroCount);
}
private static double estimateDense(
final ByteBuffer buf,
final byte minNum,
final byte overflowValue,
final short overflowPosition,
final boolean isUpperNibble
)
{
final ByteBuffer copy = buf.asReadOnlyBuffer();
double e = 0.0d;
int zeroCount = 0;
int position = 0;
while (copy.hasRemaining()) {
final int register = (int) copy.get() & 0xff;
if (overflowValue != 0 && position == overflowPosition) {
int upperNibble = ((register & 0xf0) >>> bitsPerBucket) + minNum;
int lowerNibble = (register & 0x0f) + minNum;
if (isUpperNibble) {
upperNibble = Math.max(upperNibble, overflowValue);
} else {
lowerNibble = Math.max(lowerNibble, overflowValue);
}
e += 1.0d / Math.pow(2, upperNibble) + 1.0d / Math.pow(2, lowerNibble);
zeroCount += (((upperNibble & 0xf0) == 0) ? 1 : 0) + (((lowerNibble & 0x0f) == 0) ? 1 : 0);
} else {
e += minNumRegisterLookup[minNum][register];
zeroCount += numZeroLookup[register];
}
position++;
}
return applyCorrection(e, zeroCount);
}
private static boolean isSparse(ByteBuffer buffer)
{
return buffer.remaining() != NUM_BYTES_FOR_BUCKETS;
}
private volatile ByteBuffer storageBuffer;
private volatile int initPosition;
private volatile Double estimatedCardinality;
public HyperLogLogCollector(ByteBuffer byteBuffer)
{
storageBuffer = byteBuffer.duplicate();
initPosition = byteBuffer.position();
estimatedCardinality = null;
}
public abstract byte getVersion();
public abstract void setVersion(ByteBuffer buffer);
public abstract byte getRegisterOffset();
public abstract void setRegisterOffset(byte registerOffset);
public abstract void setRegisterOffset(ByteBuffer buffer, byte registerOffset);
public abstract short getNumNonZeroRegisters();
public abstract void setNumNonZeroRegisters(short numNonZeroRegisters);
public abstract void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters);
public abstract byte getMaxOverflowValue();
public abstract void setMaxOverflowValue(byte value);
public abstract void setMaxOverflowValue(ByteBuffer buffer, byte value);
public abstract short getMaxOverflowRegister();
public abstract void setMaxOverflowRegister(short register);
public abstract void setMaxOverflowRegister(ByteBuffer buffer, short register);
public abstract int getNumHeaderBytes();
public abstract int getNumBytesForDenseStorage();
public abstract int getPayloadBytePosition();
public abstract int getPayloadBytePosition(ByteBuffer buffer);
protected int getInitPosition()
{
return initPosition;
}
protected ByteBuffer getStorageBuffer()
{
return storageBuffer;
}
public void add(byte[] hashedValue)
{
if (hashedValue.length < minBytesRequired) {
throw new IAE("Insufficient bytes, need[%d] got [%d]", minBytesRequired, hashedValue.length);
}
estimatedCardinality = null;
final ByteBuffer buffer = ByteBuffer.wrap(hashedValue);
short bucket = (short) (buffer.getShort(hashedValue.length - 2) & bucketMask);
byte positionOf1 = 0;
for (int i = 0; i < 8; ++i) {
byte lookupVal = ByteBitLookup.lookup[UnsignedBytes.toInt(hashedValue[i])];
switch (lookupVal) {
case 0:
positionOf1 += 8;
continue;
default:
positionOf1 += lookupVal;
i = 8;
break;
}
}
add(bucket, positionOf1);
}
public void add(short bucket, byte positionOf1)
{
if (storageBuffer.isReadOnly()) {
convertToMutableByteBuffer();
}
byte registerOffset = getRegisterOffset();
// discard everything outside of the range we care about
if (positionOf1 <= registerOffset) {
return;
} else if (positionOf1 > (registerOffset + range)) {
byte currMax = getMaxOverflowValue();
if (positionOf1 > currMax) {
setMaxOverflowValue(positionOf1);
setMaxOverflowRegister(bucket);
}
return;
}
// whatever value we add must be stored in 4 bits
short numNonZeroRegisters = addNibbleRegister(bucket, (byte) ((0xff & positionOf1) - registerOffset));
setNumNonZeroRegisters(numNonZeroRegisters);
if (numNonZeroRegisters == NUM_BUCKETS) {
setRegisterOffset(++registerOffset);
setNumNonZeroRegisters(decrementBuckets());
}
}
public HyperLogLogCollector fold(HyperLogLogCollector other)
{
if (other == null || other.storageBuffer.remaining() == 0) {
return this;
}
if (storageBuffer.isReadOnly()) {
convertToMutableByteBuffer();
}
estimatedCardinality = null;
if (getRegisterOffset() < other.getRegisterOffset()) {
// "Swap" the buffers so that we are folding into the one with the higher offset
ByteBuffer newStorage = ByteBuffer.allocate(other.storageBuffer.remaining());
newStorage.put(other.storageBuffer.asReadOnlyBuffer());
newStorage.clear();
other.storageBuffer = storageBuffer;
other.initPosition = initPosition;
storageBuffer = newStorage;
initPosition = 0;
}
ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
byte otherOffset = other.getRegisterOffset();
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
convertToDenseStorage();
}
byte myOffset = getRegisterOffset();
short numNonZero = getNumNonZeroRegisters();
int offsetDiff = myOffset - otherOffset;
if (offsetDiff < 0) {
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
}
byte otherOverflowValue = other.getMaxOverflowValue();
short otherOverflowRegister = other.getMaxOverflowRegister();
add(otherOverflowRegister, otherOverflowValue);
int myPayloadStart = getPayloadBytePosition();
otherBuffer.position(other.getPayloadBytePosition());
if (isSparse(otherBuffer)) {
while (otherBuffer.hasRemaining()) {
short position = otherBuffer.getShort();
int payloadStartPosition = position - other.getNumHeaderBytes();
numNonZero += mergeAndStoreByteRegister(
myPayloadStart + payloadStartPosition,
offsetDiff,
otherBuffer.get()
);
if (numNonZero == NUM_BUCKETS) {
myOffset += 1;
numNonZero = decrementBuckets();
setRegisterOffset(myOffset);
setNumNonZeroRegisters(numNonZero);
offsetDiff = myOffset - otherOffset;
}
}
} else { // dense
int position = getPayloadBytePosition();
while (otherBuffer.hasRemaining()) {
numNonZero += mergeAndStoreByteRegister(
position,
offsetDiff,
otherBuffer.get()
);
if (numNonZero == NUM_BUCKETS) {
myOffset += 1;
numNonZero = decrementBuckets();
setRegisterOffset(myOffset);
setNumNonZeroRegisters(numNonZero);
offsetDiff = myOffset - otherOffset;
}
position++;
}
}
setRegisterOffset(myOffset);
setNumNonZeroRegisters(numNonZero);
return this;
}
public HyperLogLogCollector fold(ByteBuffer buffer)
{
return fold(makeCollector(buffer));
}
public ByteBuffer toByteBuffer()
{
short numNonZeroRegisters = getNumNonZeroRegisters();
// store sparsely
if (storageBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) {
ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]);
setVersion(retVal);
setRegisterOffset(retVal, getRegisterOffset());
setNumNonZeroRegisters(retVal, numNonZeroRegisters);
setMaxOverflowValue(retVal, getMaxOverflowValue());
setMaxOverflowRegister(retVal, getMaxOverflowRegister());
int startPosition = getPayloadBytePosition();
retVal.position(getPayloadBytePosition(retVal));
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
if (storageBuffer.get(i) != 0) {
retVal.putShort((short) (0xffff & (i - initPosition)));
retVal.put(storageBuffer.get(i));
}
}
retVal.rewind();
return retVal.asReadOnlyBuffer();
}
return storageBuffer.asReadOnlyBuffer();
}
@JsonValue
public byte[] toByteArray()
{
final ByteBuffer buffer = toByteBuffer();
byte[] theBytes = new byte[buffer.remaining()];
buffer.get(theBytes);
return theBytes;
}
public double estimateCardinality()
{
if (estimatedCardinality == null) {
byte registerOffset = getRegisterOffset();
byte overflowValue = getMaxOverflowValue();
short overflowRegister = getMaxOverflowRegister();
short overflowPosition = (short) (overflowRegister >>> 1);
boolean isUpperNibble = ((overflowRegister & 0x1) == 0);
storageBuffer.position(getPayloadBytePosition());
if (isSparse(storageBuffer)) {
estimatedCardinality = estimateSparse(
storageBuffer,
registerOffset,
overflowValue,
overflowPosition,
isUpperNibble
);
} else {
estimatedCardinality = estimateDense(
storageBuffer,
registerOffset,
overflowValue,
overflowPosition,
isUpperNibble
);
}
storageBuffer.position(initPosition);
}
return estimatedCardinality;
}
public double estimateByteBuffer(ByteBuffer buf)
{
return makeCollector(buf).estimateCardinality();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HyperLogLogCollector collector = (HyperLogLogCollector) o;
if (storageBuffer != null ? !storageBuffer.equals(collector.storageBuffer) : collector.storageBuffer != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = storageBuffer != null ? storageBuffer.hashCode() : 0;
result = 31 * result + initPosition;
return result;
}
@Override
public String toString()
{
return "HyperLogLogCollector{" +
"initPosition=" + initPosition +
", version=" + getVersion() +
", registerOffset=" + getRegisterOffset() +
", numNonZeroRegisters=" + getNumNonZeroRegisters() +
", maxOverflowValue=" + getMaxOverflowValue() +
", maxOverflowRegister=" + getMaxOverflowRegister() +
'}';
}
private short decrementBuckets()
{
short count = 0;
int startPosition = getPayloadBytePosition();
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
byte val = (byte) (storageBuffer.get(i) - 0x11);
if ((val & 0xf0) != 0) {
count++;
}
if ((val & 0x0f) != 0) {
count++;
}
storageBuffer.put(i, val);
}
return count;
}
private void convertToMutableByteBuffer()
{
ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
tmpBuffer.position(0);
storageBuffer = tmpBuffer;
initPosition = 0;
}
private void convertToDenseStorage()
{
ByteBuffer tmpBuffer = ByteBuffer.wrap(new byte[getNumBytesForDenseStorage()]);
// put header
setVersion(tmpBuffer);
setRegisterOffset(tmpBuffer, getRegisterOffset());
setNumNonZeroRegisters(tmpBuffer, getNumNonZeroRegisters());
setMaxOverflowValue(tmpBuffer, getMaxOverflowValue());
setMaxOverflowRegister(tmpBuffer, getMaxOverflowRegister());
storageBuffer.position(getPayloadBytePosition());
tmpBuffer.position(getPayloadBytePosition(tmpBuffer));
// put payload
while (storageBuffer.hasRemaining()) {
tmpBuffer.put(storageBuffer.getShort(), storageBuffer.get());
}
tmpBuffer.rewind();
storageBuffer = tmpBuffer;
initPosition = 0;
}
private short addNibbleRegister(short bucket, byte positionOf1)
{
short numNonZeroRegs = getNumNonZeroRegisters();
final short position = (short) (bucket >> 1);
final boolean isUpperNibble = ((bucket & 0x1) == 0);
byte shiftedPositionOf1 = (isUpperNibble) ? (byte) (positionOf1 << bitsPerBucket) : positionOf1;
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
convertToDenseStorage();
}
byte origVal = storageBuffer.get(getPayloadBytePosition() + position);
byte newValueMask = (isUpperNibble) ? (byte) 0xf0 : (byte) 0x0f;
byte originalValueMask = (byte) (newValueMask ^ 0xff);
// if something was at zero, we have to increase the numNonZeroRegisters
if ((origVal & newValueMask) == 0 && shiftedPositionOf1 != 0) {
numNonZeroRegs++;
}
storageBuffer.put(
getPayloadBytePosition() + position,
(byte) (UnsignedBytes.max((byte) (origVal & newValueMask), shiftedPositionOf1) | (origVal & originalValueMask))
);
return numNonZeroRegs;
}
/**
* Returns the number of registers that are no longer zero after the value was added
*
* @param position The position into the byte buffer, this position represents two "registers"
* @param offsetDiff The difference in offset between the byteToAdd and the current HyperLogLogCollector
* @param byteToAdd The byte to merge into the current HyperLogLogCollector
*
* @return
*/
private int mergeAndStoreByteRegister(
int position,
int offsetDiff,
byte byteToAdd
)
{
if (byteToAdd == 0) {
return 0;
}
byte currVal = storageBuffer.get(position);
int upperNibble = currVal & 0xf0;
int lowerNibble = currVal & 0x0f;
// subtract the differences so that the nibbles align
int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << bitsPerBucket);
int otherLower = (byteToAdd & 0x0f) - offsetDiff;
final int newUpper = Math.max(upperNibble, otherUpper);
final int newLower = Math.max(lowerNibble, otherLower);
int numNoLongerZero = 0;
if (upperNibble == 0 && newUpper > 0) {
++numNoLongerZero;
}
if (lowerNibble == 0 && newLower > 0) {
++numNoLongerZero;
}
storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff));
return numNoLongerZero;
}
@Override
public int compareTo(HyperLogLogCollector other)
{
final int lhsOffset = (int) this.getRegisterOffset() & 0xffff;
final int rhsOffset = (int) other.getRegisterOffset() & 0xffff;
if (lhsOffset == rhsOffset) {
final int lhsNumNonZero = (int) this.getNumNonZeroRegisters() & 0xff;
final int rhsNumNonZero = (int) this.getNumNonZeroRegisters() & 0xff;
int retVal = Double.compare(lhsNumNonZero, rhsNumNonZero);
if (retVal == 0) {
retVal = Double.compare(this.estimateCardinality(), other.estimateCardinality());
}
return retVal;
} else {
return Double.compare(lhsOffset, rhsOffset);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.PostAggregator;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
/**
*/
public class HyperUniqueFinalizingPostAggregator implements PostAggregator
{
private final String fieldName;
@JsonCreator
public HyperUniqueFinalizingPostAggregator(
@JsonProperty("fieldName") String fieldName
)
{
this.fieldName = fieldName;
}
@Override
public Set<String> getDependentFields()
{
return Sets.newHashSet(fieldName);
}
@Override
public Comparator getComparator()
{
throw new UnsupportedOperationException();
}
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
return HyperUniquesAggregatorFactory.estimateCardinality(combinedAggregators.get(fieldName));
}
@Override
@JsonProperty("fieldName")
public String getName()
{
return fieldName;
}
}

View File

@ -0,0 +1,86 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.ObjectColumnSelector;
/**
*/
public class HyperUniquesAggregator implements Aggregator
{
private final String name;
private final ObjectColumnSelector selector;
private HyperLogLogCollector collector;
public HyperUniquesAggregator(
String name,
ObjectColumnSelector selector
)
{
this.name = name;
this.selector = selector;
this.collector = HyperLogLogCollector.makeLatestCollector();
}
@Override
public void aggregate()
{
collector.fold((HyperLogLogCollector) selector.get());
}
@Override
public void reset()
{
collector = HyperLogLogCollector.makeLatestCollector();
}
@Override
public Object get()
{
return collector;
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException();
}
@Override
public String getName()
{
return name;
}
@Override
public Aggregator clone()
{
return new HyperUniquesAggregator(name, selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,210 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.metamx.common.IAE;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.NoopAggregator;
import io.druid.query.aggregation.NoopBufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import org.apache.commons.codec.binary.Base64;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
/**
*/
public class HyperUniquesAggregatorFactory implements AggregatorFactory
{
public static Object estimateCardinality(Object object)
{
if (object == null) {
return 0;
}
return ((HyperLogLogCollector) object).estimateCardinality();
}
private static final byte CACHE_TYPE_ID = 0x5;
private final String name;
private final String fieldName;
@JsonCreator
public HyperUniquesAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName
)
{
this.name = name;
this.fieldName = fieldName.toLowerCase();
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return new NoopAggregator(name);
}
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
return new HyperUniquesAggregator(name, selector);
}
throw new IAE(
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return new NoopBufferAggregator();
}
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
return new HyperUniquesBufferAggregator(selector);
}
throw new IAE(
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
);
}
@Override
public Comparator getComparator()
{
return new Comparator<HyperLogLogCollector>()
{
@Override
public int compare(HyperLogLogCollector lhs, HyperLogLogCollector rhs)
{
return lhs.compareTo(rhs);
}
};
}
@Override
public Object combine(Object lhs, Object rhs)
{
if (rhs == null) {
return lhs;
}
if (lhs == null) {
return rhs;
}
return ((HyperLogLogCollector) lhs).fold((HyperLogLogCollector) rhs);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new HyperUniquesAggregatorFactory(name, name);
}
@Override
public Object deserialize(Object object)
{
if (object instanceof byte[]) {
return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object));
} else if (object instanceof ByteBuffer) {
return HyperLogLogCollector.makeCollector((ByteBuffer) object);
} else if (object instanceof String) {
return HyperLogLogCollector.makeCollector(
ByteBuffer.wrap(Base64.decodeBase64(((String) object).getBytes(Charsets.UTF_8)))
);
}
return object;
}
@Override
public Object finalizeComputation(Object object)
{
return estimateCardinality(object);
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(fieldName);
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8);
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
}
@Override
public String getTypeName()
{
return "hyperUnique";
}
@Override
public int getMaxIntermediateSize()
{
return HyperLogLogCollector.getLatestNumBytesForDenseStorage();
}
@Override
public Object getAggregatorStartValue()
{
return HyperLogLogCollector.makeLatestCollector();
}
@Override
public String toString()
{
return "HyperUniquesAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

View File

@ -0,0 +1,87 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
/**
*/
public class HyperUniquesBufferAggregator implements BufferAggregator
{
private static final byte[] EMPTY_BYTES = HyperLogLogCollector.makeEmptyVersionedByteArray();
private final ObjectColumnSelector selector;
public HyperUniquesBufferAggregator(
ObjectColumnSelector selector
)
{
this.selector = selector;
}
@Override
public void init(ByteBuffer buf, int position)
{
final ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
mutationBuffer.put(EMPTY_BYTES);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
HyperLogLogCollector collector = (HyperLogLogCollector) selector.get();
if (collector == null) {
return;
}
HyperLogLogCollector.makeCollector(
(ByteBuffer) buf.duplicate().position(position).limit(
position
+ HyperLogLogCollector.getLatestNumBytesForDenseStorage()
)
).fold(collector);
}
@Override
public Object get(ByteBuffer buf, int position)
{
ByteBuffer dataCopyBuffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
mutationBuffer.get(dataCopyBuffer.array());
return HyperLogLogCollector.makeCollector(dataCopyBuffer);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,148 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.google.common.base.Charsets;
import com.google.common.collect.Ordering;
import com.google.common.hash.HashFunction;
import io.druid.data.input.InputRow;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.serde.ColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSupplier;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import java.nio.ByteBuffer;
import java.util.List;
/**
*/
public class HyperUniquesSerde extends ComplexMetricSerde
{
private static Ordering<HyperLogLogCollector> comparator = new Ordering<HyperLogLogCollector>()
{
@Override
public int compare(
HyperLogLogCollector arg1, HyperLogLogCollector arg2
)
{
return arg1.toByteBuffer().compareTo(arg2.toByteBuffer());
}
}.nullsFirst();
private final HashFunction hashFn;
public HyperUniquesSerde(
HashFunction hashFn
)
{
this.hashFn = hashFn;
}
@Override
public String getTypeName()
{
return "hyperUnique";
}
@Override
public ComplexMetricExtractor getExtractor()
{
return new ComplexMetricExtractor()
{
@Override
public Class<HyperLogLogCollector> extractedClass()
{
return HyperLogLogCollector.class;
}
@Override
public HyperLogLogCollector extractValue(InputRow inputRow, String metricName)
{
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
List<String> dimValues = inputRow.getDimension(metricName);
if (dimValues == null) {
return collector;
}
for (String dimensionValue : dimValues) {
collector.add(hashFn.hashBytes(dimensionValue.getBytes(Charsets.UTF_8)).asBytes());
}
return collector;
}
};
}
@Override
public ColumnPartSerde deserializeColumn(
ByteBuffer byteBuffer, ColumnBuilder columnBuilder
)
{
final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy());
columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));
return new ComplexColumnPartSerde(column, getTypeName());
}
@Override
public ObjectStrategy getObjectStrategy()
{
return new ObjectStrategy<HyperLogLogCollector>()
{
@Override
public Class<? extends HyperLogLogCollector> getClazz()
{
return HyperLogLogCollector.class;
}
@Override
public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
{
buffer.limit(buffer.position() + numBytes);
int remaining = buffer.remaining();
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
}
@Override
public byte[] toBytes(HyperLogLogCollector collector)
{
if (collector == null) {
return new byte[]{};
}
ByteBuffer val = collector.toByteBuffer();
byte[] retVal = new byte[val.remaining()];
val.asReadOnlyBuffer().get(retVal);
return retVal;
}
@Override
public int compare(HyperLogLogCollector o1, HyperLogLogCollector o2)
{
return comparator.compare(o1, o2);
}
};
}
}

View File

@ -25,6 +25,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
@ -48,7 +49,7 @@ import java.util.List;
*/
public class QueryRunnerTestHelper
{
public static final String segmentId= "testSegment";
public static final String segmentId = "testSegment";
public static final String dataSource = "testing";
public static final QueryGranularity dayGran = QueryGranularity.DAY;
public static final QueryGranularity allGran = QueryGranularity.ALL;
@ -57,9 +58,15 @@ public class QueryRunnerTestHelper
public static final String placementDimension = "placement";
public static final String placementishDimension = "placementish";
public static final String indexMetric = "index";
public static final String uniqueMetric = "uniques";
public static final String addRowsIndexConstantMetric = "addRowsIndexConstant";
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
public static final HyperUniquesAggregatorFactory qualityUniques = new HyperUniquesAggregatorFactory(
"uniques",
"quality_uniques"
);
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
@ -67,7 +74,15 @@ public class QueryRunnerTestHelper
new ArithmeticPostAggregator(
"addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
);
public static final List<AggregatorFactory> commonAggregators = Arrays.asList(rowsCount, indexDoubleSum);
public static final List<AggregatorFactory> commonAggregators = Arrays.asList(
rowsCount,
indexDoubleSum,
qualityUniques
);
public static final double UNIQUES_9 = 9.019833517963864;
public static final double UNIQUES_2 = 2.000977198748901d;
public static final double UNIQUES_1 = 1.0002442201269182d;
public static final String[] expectedFullOnIndexValues = new String[]{
"4500.0", "6077.949111938477", "4922.488838195801", "5726.140853881836", "4698.468170166016",

View File

@ -0,0 +1,793 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Random;
/**
*/
public class HyperLogLogCollectorTest
{
private final HashFunction fn = Hashing.murmur3_128();
private final Random random = new Random();
@Test
public void testFolding() throws Exception
{
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
for (int numThings : numValsToCheck) {
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector oneHalf = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector otherHalf = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < numThings; ++i) {
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
allCombined.add(hashedVal);
if (i % 2 == 0) {
oneHalf.add(hashedVal);
} else {
otherHalf.add(hashedVal);
}
}
HyperLogLogCollector folded = HyperLogLogCollector.makeLatestCollector();
folded.fold(oneHalf);
Assert.assertEquals(oneHalf, folded);
Assert.assertEquals(oneHalf.estimateCardinality(), folded.estimateCardinality(), 0.0d);
folded.fold(otherHalf);
Assert.assertEquals(allCombined, folded);
Assert.assertEquals(allCombined.estimateCardinality(), folded.estimateCardinality(), 0.0d);
}
}
// @Test
public void testHighCardinalityRollingFold() throws Exception
{
final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector();
final HyperLogLogCollector simple = HyperLogLogCollector.makeLatestCollector();
int count;
MessageDigest md = MessageDigest.getInstance("SHA-1");
HyperLogLogCollector tmp = HyperLogLogCollector.makeLatestCollector();
for (count = 0; count < 5000000; ++count) {
md.update(Integer.toString(count).getBytes());
byte[] hashed = fn.hashBytes(md.digest()).asBytes();
tmp.add(hashed);
simple.add(hashed);
if (count % 100 == 0) {
rolling.fold(tmp);
tmp = HyperLogLogCollector.makeLatestCollector();
}
}
int n = count;
System.out.println("True cardinality " + n);
System.out.println("Rolling buffer cardinality " + rolling.estimateCardinality());
System.out.println("Simple buffer cardinality " + simple.estimateCardinality());
System.out.println(
String.format(
"Rolling cardinality estimate off by %4.1f%%",
100 * (1 - rolling.estimateCardinality() / n)
)
);
Assert.assertEquals(n, simple.estimateCardinality(), n * 0.05);
Assert.assertEquals(n, rolling.estimateCardinality(), n * 0.05);
}
//@Test
public void testHighCardinalityRollingFold2() throws Exception
{
final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector();
int count;
long start = System.currentTimeMillis();
for (count = 0; count < 5000000; ++count) {
HyperLogLogCollector theCollector = HyperLogLogCollector.makeLatestCollector();
theCollector.add(fn.hashLong(count).asBytes());
rolling.fold(theCollector);
}
System.out.printf("testHighCardinalityRollingFold2 took %d ms%n", System.currentTimeMillis() - start);
int n = count;
System.out.println("True cardinality " + n);
System.out.println("Rolling buffer cardinality " + rolling.estimateCardinality());
System.out.println(
String.format(
"Rolling cardinality estimate off by %4.1f%%",
100 * (1 - rolling.estimateCardinality() / n)
)
);
Assert.assertEquals(n, rolling.estimateCardinality(), n * 0.05);
}
@Test
public void testFoldingByteBuffers() throws Exception
{
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
for (int numThings : numValsToCheck) {
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector oneHalf = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector otherHalf = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < numThings; ++i) {
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
allCombined.add(hashedVal);
if (i % 2 == 0) {
oneHalf.add(hashedVal);
} else {
otherHalf.add(hashedVal);
}
}
HyperLogLogCollector folded = HyperLogLogCollector.makeLatestCollector();
folded.fold(oneHalf.toByteBuffer());
Assert.assertEquals(oneHalf, folded);
Assert.assertEquals(oneHalf.estimateCardinality(), folded.estimateCardinality(), 0.0d);
folded.fold(otherHalf.toByteBuffer());
Assert.assertEquals(allCombined, folded);
Assert.assertEquals(allCombined.estimateCardinality(), folded.estimateCardinality(), 0.0d);
}
}
@Test
public void testFoldingReadOnlyByteBuffers() throws Exception
{
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
for (int numThings : numValsToCheck) {
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector oneHalf = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector otherHalf = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < numThings; ++i) {
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
allCombined.add(hashedVal);
if (i % 2 == 0) {
oneHalf.add(hashedVal);
} else {
otherHalf.add(hashedVal);
}
}
HyperLogLogCollector folded = HyperLogLogCollector.makeCollector(
ByteBuffer.wrap(HyperLogLogCollector.makeEmptyVersionedByteArray())
.asReadOnlyBuffer()
);
folded.fold(oneHalf.toByteBuffer());
Assert.assertEquals(oneHalf, folded);
Assert.assertEquals(oneHalf.estimateCardinality(), folded.estimateCardinality(), 0.0d);
folded.fold(otherHalf.toByteBuffer());
Assert.assertEquals(allCombined, folded);
Assert.assertEquals(allCombined.estimateCardinality(), folded.estimateCardinality(), 0.0d);
}
}
@Test
public void testFoldingReadOnlyByteBuffersWithArbitraryPosition() throws Exception
{
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
for (int numThings : numValsToCheck) {
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector oneHalf = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector otherHalf = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < numThings; ++i) {
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
allCombined.add(hashedVal);
if (i % 2 == 0) {
oneHalf.add(hashedVal);
} else {
otherHalf.add(hashedVal);
}
}
HyperLogLogCollector folded = HyperLogLogCollector.makeCollector(
shiftedBuffer(
ByteBuffer.wrap(HyperLogLogCollector.makeEmptyVersionedByteArray())
.asReadOnlyBuffer(),
17
)
);
folded.fold(oneHalf.toByteBuffer());
Assert.assertEquals(oneHalf, folded);
Assert.assertEquals(oneHalf.estimateCardinality(), folded.estimateCardinality(), 0.0d);
folded.fold(otherHalf.toByteBuffer());
Assert.assertEquals(allCombined, folded);
Assert.assertEquals(allCombined.estimateCardinality(), folded.estimateCardinality(), 0.0d);
}
}
@Test
public void testFoldWithDifferentOffsets1() throws Exception
{
ByteBuffer biggerOffset = makeCollectorBuffer(1, (byte) 0x00, 0x11);
ByteBuffer smallerOffset = makeCollectorBuffer(0, (byte) 0x20, 0x00);
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
collector.fold(biggerOffset);
collector.fold(smallerOffset);
ByteBuffer outBuffer = collector.toByteBuffer();
Assert.assertEquals(outBuffer.get(), collector.getVersion());
Assert.assertEquals(outBuffer.get(), 1);
Assert.assertEquals(outBuffer.getShort(), 2047);
outBuffer.get();
outBuffer.getShort();
Assert.assertEquals(outBuffer.get(), 0x10);
while (outBuffer.hasRemaining()) {
Assert.assertEquals(outBuffer.get(), 0x11);
}
collector = HyperLogLogCollector.makeLatestCollector();
collector.fold(smallerOffset);
collector.fold(biggerOffset);
outBuffer = collector.toByteBuffer();
Assert.assertEquals(outBuffer.get(), collector.getVersion());
Assert.assertEquals(outBuffer.get(), 1);
Assert.assertEquals(outBuffer.getShort(), 2047);
Assert.assertEquals(outBuffer.get(), 0);
Assert.assertEquals(outBuffer.getShort(), 0);
Assert.assertEquals(outBuffer.get(), 0x10);
while (outBuffer.hasRemaining()) {
Assert.assertEquals(outBuffer.get(), 0x11);
}
}
@Test
public void testFoldWithArbitraryInitialPositions() throws Exception
{
ByteBuffer biggerOffset = shiftedBuffer(makeCollectorBuffer(1, (byte) 0x00, 0x11), 10);
ByteBuffer smallerOffset = shiftedBuffer(makeCollectorBuffer(0, (byte) 0x20, 0x00), 15);
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
collector.fold(biggerOffset);
collector.fold(smallerOffset);
ByteBuffer outBuffer = collector.toByteBuffer();
Assert.assertEquals(outBuffer.get(), collector.getVersion());
Assert.assertEquals(outBuffer.get(), 1);
Assert.assertEquals(outBuffer.getShort(), 2047);
outBuffer.get();
outBuffer.getShort();
Assert.assertEquals(outBuffer.get(), 0x10);
while (outBuffer.hasRemaining()) {
Assert.assertEquals(outBuffer.get(), 0x11);
}
collector = HyperLogLogCollector.makeLatestCollector();
collector.fold(smallerOffset);
collector.fold(biggerOffset);
outBuffer = collector.toByteBuffer();
Assert.assertEquals(outBuffer.get(), collector.getVersion());
Assert.assertEquals(outBuffer.get(), 1);
Assert.assertEquals(outBuffer.getShort(), 2047);
outBuffer.get();
outBuffer.getShort();
Assert.assertEquals(outBuffer.get(), 0x10);
while (outBuffer.hasRemaining()) {
Assert.assertEquals(outBuffer.get(), 0x11);
}
}
protected ByteBuffer shiftedBuffer(ByteBuffer buf, int offset)
{
ByteBuffer shifted = ByteBuffer.allocate(buf.remaining() + offset);
shifted.position(offset);
shifted.put(buf);
shifted.position(offset);
return shifted;
}
@Test
public void testFoldWithDifferentOffsets2() throws Exception
{
ByteBuffer biggerOffset = makeCollectorBuffer(1, (byte) 0x01, 0x11);
ByteBuffer smallerOffset = makeCollectorBuffer(0, (byte) 0x20, 0x00);
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
collector.fold(biggerOffset);
collector.fold(smallerOffset);
ByteBuffer outBuffer = collector.toByteBuffer();
Assert.assertEquals(outBuffer.get(), collector.getVersion());
Assert.assertEquals(outBuffer.get(), 2);
Assert.assertEquals(outBuffer.getShort(), 0);
outBuffer.get();
outBuffer.getShort();
Assert.assertFalse(outBuffer.hasRemaining());
collector = HyperLogLogCollector.makeLatestCollector();
collector.fold(smallerOffset);
collector.fold(biggerOffset);
outBuffer = collector.toByteBuffer();
Assert.assertEquals(outBuffer.get(), collector.getVersion());
Assert.assertEquals(outBuffer.get(), 2);
Assert.assertEquals(outBuffer.getShort(), 0);
outBuffer.get();
outBuffer.getShort();
Assert.assertFalse(outBuffer.hasRemaining());
}
@Test
public void testFoldWithUpperNibbleTriggersOffsetChange() throws Exception
{
byte[] arr1 = new byte[HyperLogLogCollector.getLatestNumBytesForDenseStorage()];
Arrays.fill(arr1, (byte) 0x11);
ByteBuffer buffer1 = ByteBuffer.wrap(arr1);
buffer1.put(0, HLLCV1.VERSION);
buffer1.put(1, (byte) 0);
buffer1.putShort(2, (short) (2047));
buffer1.put(HLLCV1.HEADER_NUM_BYTES, (byte) 0x1);
byte[] arr2 = new byte[HyperLogLogCollector.getLatestNumBytesForDenseStorage()];
Arrays.fill(arr2, (byte) 0x11);
ByteBuffer buffer2 = ByteBuffer.wrap(arr2);
buffer2.put(0, HLLCV1.VERSION);
buffer2.put(1, (byte) 0);
buffer2.putShort(2, (short) (2048));
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer1);
collector.fold(buffer2);
ByteBuffer outBuffer = collector.toByteBuffer();
Assert.assertEquals(outBuffer.get(), HLLCV1.VERSION);
Assert.assertEquals(outBuffer.get(), 1);
Assert.assertEquals(outBuffer.getShort(), 0);
outBuffer.get();
outBuffer.getShort();
Assert.assertFalse(outBuffer.hasRemaining());
}
@Test
public void testSparseFoldWithDifferentOffsets1() throws Exception
{
ByteBuffer biggerOffset = makeCollectorBuffer(1, new byte[]{0x11, 0x10}, 0x11);
ByteBuffer sparse = HyperLogLogCollector.makeCollector(makeCollectorBuffer(0, new byte[]{0x00, 0x02}, 0x00))
.toByteBuffer();
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
collector.fold(biggerOffset);
collector.fold(sparse);
ByteBuffer outBuffer = collector.toByteBuffer();
Assert.assertEquals(outBuffer.get(), collector.getVersion());
Assert.assertEquals(outBuffer.get(), 2);
Assert.assertEquals(outBuffer.getShort(), 0);
Assert.assertEquals(outBuffer.get(), 0);
Assert.assertEquals(outBuffer.getShort(), 0);
Assert.assertFalse(outBuffer.hasRemaining());
collector = HyperLogLogCollector.makeLatestCollector();
collector.fold(sparse);
collector.fold(biggerOffset);
outBuffer = collector.toByteBuffer();
Assert.assertEquals(outBuffer.get(), collector.getVersion());
Assert.assertEquals(outBuffer.get(), 2);
Assert.assertEquals(outBuffer.getShort(), 0);
Assert.assertEquals(outBuffer.get(), 0);
Assert.assertEquals(outBuffer.getShort(), 0);
Assert.assertFalse(outBuffer.hasRemaining());
}
private ByteBuffer makeCollectorBuffer(int offset, byte initialBytes, int remainingBytes)
{
return makeCollectorBuffer(offset, new byte[]{initialBytes}, remainingBytes);
}
private ByteBuffer makeCollectorBuffer(int offset, byte[] initialBytes, int remainingBytes)
{
short numNonZero = 0;
for (byte initialByte : initialBytes) {
numNonZero += computeNumNonZero(initialByte);
}
final short numNonZeroInRemaining = computeNumNonZero((byte) remainingBytes);
numNonZero += (HyperLogLogCollector.NUM_BYTES_FOR_BUCKETS - initialBytes.length) * numNonZeroInRemaining;
ByteBuffer biggerOffset = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
biggerOffset.put(HLLCV1.VERSION);
biggerOffset.put((byte) offset);
biggerOffset.putShort(numNonZero);
biggerOffset.put((byte) 0);
biggerOffset.putShort((short) 0);
biggerOffset.put(initialBytes);
while (biggerOffset.hasRemaining()) {
biggerOffset.put((byte) remainingBytes);
}
biggerOffset.clear();
return biggerOffset.asReadOnlyBuffer();
}
private short computeNumNonZero(byte theByte)
{
short retVal = 0;
if ((theByte & 0x0f) > 0) {
++retVal;
}
if ((theByte & 0xf0) > 0) {
++retVal;
}
return retVal;
}
//@Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
public void testFoldingwithDifferentOffsets() throws Exception
{
for (int j = 0; j < 10; j++) {
HyperLogLogCollector smallVals = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector bigVals = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector all = HyperLogLogCollector.makeLatestCollector();
int numThings = 500000;
for (int i = 0; i < numThings; i++) {
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
if (i < 1000) {
smallVals.add(hashedVal);
} else {
bigVals.add(hashedVal);
}
all.add(hashedVal);
}
HyperLogLogCollector folded = HyperLogLogCollector.makeLatestCollector();
folded.fold(smallVals);
folded.fold(bigVals);
final double expected = all.estimateCardinality();
Assert.assertEquals(expected, folded.estimateCardinality(), expected * 0.025);
Assert.assertEquals(numThings, folded.estimateCardinality(), numThings * 0.05);
}
}
//@Test
public void testFoldingwithDifferentOffsets2() throws Exception
{
MessageDigest md = MessageDigest.getInstance("SHA-1");
for (int j = 0; j < 1; j++) {
HyperLogLogCollector evenVals = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector oddVals = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector all = HyperLogLogCollector.makeLatestCollector();
int numThings = 500000;
for (int i = 0; i < numThings; i++) {
md.update(Integer.toString(random.nextInt()).getBytes());
byte[] hashedVal = fn.hashBytes(md.digest()).asBytes();
if (i % 2 == 0) {
evenVals.add(hashedVal);
} else {
oddVals.add(hashedVal);
}
all.add(hashedVal);
}
HyperLogLogCollector folded = HyperLogLogCollector.makeLatestCollector();
folded.fold(evenVals);
folded.fold(oddVals);
final double expected = all.estimateCardinality();
Assert.assertEquals(expected, folded.estimateCardinality(), expected * 0.025);
Assert.assertEquals(numThings, folded.estimateCardinality(), numThings * 0.05);
}
}
@Test
public void testEstimation() throws Exception
{
Random random = new Random(0l);
final int[] valsToCheck = {10, 20, 50, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 1000000, 2000000};
final double[] expectedVals = {
11.029647221949576, 21.108407720752034, 51.64575281885815, 100.42231726408892,
981.8579991802412, 1943.1337257462792, 4946.192042635218, 9935.088157579434,
20366.1486889433, 49433.56029693898, 100615.26273314281, 980831.624899156000,
1982408.2608981386
};
int valsToCheckIndex = 0;
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < valsToCheck[valsToCheck.length - 1]; ++i) {
collector.add(fn.hashLong(random.nextLong()).asBytes());
if (i == valsToCheck[valsToCheckIndex]) {
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
++valsToCheckIndex;
}
}
Assert.assertEquals(expectedVals.length, valsToCheckIndex + 1);
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
}
@Test
public void testEstimationReadOnlyByteBuffers() throws Exception
{
Random random = new Random(0l);
final int[] valsToCheck = {10, 20, 50, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 1000000, 2000000};
final double[] expectedVals = {
11.029647221949576, 21.108407720752034, 51.64575281885815, 100.42231726408892,
981.8579991802412, 1943.1337257462792, 4946.192042635218, 9935.088157579434,
20366.1486889433, 49433.56029693898, 100615.26273314281, 980831.624899156000,
1982408.2608981386
};
int valsToCheckIndex = 0;
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
ByteBuffer.allocateDirect(
HyperLogLogCollector.getLatestNumBytesForDenseStorage()
)
);
for (int i = 0; i < valsToCheck[valsToCheck.length - 1]; ++i) {
collector.add(fn.hashLong(random.nextLong()).asBytes());
if (i == valsToCheck[valsToCheckIndex]) {
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
++valsToCheckIndex;
}
}
Assert.assertEquals(expectedVals.length, valsToCheckIndex + 1);
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
}
@Test
public void testEstimationLimitDifferentFromCapacity() throws Exception
{
Random random = new Random(0l);
final int[] valsToCheck = {10, 20, 50, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 1000000, 2000000};
final double[] expectedVals = {
11.029647221949576, 21.108407720752034, 51.64575281885815, 100.42231726408892,
981.8579991802412, 1943.1337257462792, 4946.192042635218, 9935.088157579434,
20366.1486889433, 49433.56029693898, 100615.26273314281, 980831.624899156000,
1982408.2608981386
};
int valsToCheckIndex = 0;
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
(ByteBuffer) ByteBuffer.allocate(10000)
.position(0)
.limit(HyperLogLogCollector.getLatestNumBytesForDenseStorage())
);
for (int i = 0; i < valsToCheck[valsToCheck.length - 1]; ++i) {
collector.add(fn.hashLong(random.nextLong()).asBytes());
if (i == valsToCheck[valsToCheckIndex]) {
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
++valsToCheckIndex;
}
}
Assert.assertEquals(expectedVals.length, valsToCheckIndex + 1);
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
}
@Test
public void testSparseEstimation() throws Exception
{
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < 100; ++i) {
collector.add(fn.hashLong(random.nextLong()).asBytes());
}
Assert.assertEquals(
collector.estimateCardinality(), collector.estimateByteBuffer(collector.toByteBuffer()), 0.0d
);
}
@Test
public void testHighBits() throws Exception
{
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
// fill up all the buckets so we reach a registerOffset of 49
fillBuckets(collector, (byte) 0, (byte) 49);
// highest possible bit position is 64
collector.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
Assert.assertEquals(8.5089685793441677E17, collector.estimateCardinality(), 1000);
// this might happen once in a million years if you hash a billion values a second
fillBuckets(collector, (byte) 0, (byte) 63);
collector.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
Assert.assertEquals(Double.MAX_VALUE, collector.estimateCardinality(), 1000);
}
@Test
public void testCompare1() throws Exception
{
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
collector1.add(fn.hashLong(0).asBytes());
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 1; i < 100; i = i + 2) {
collector1.add(fn.hashLong(i).asBytes());
collector2.add(fn.hashLong(i + 1).asBytes());
Assert.assertEquals(1, comparator.compare(collector1, collector2));
Assert.assertEquals(1, Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()));
}
}
@Test
public void testCompare2() throws Exception
{
Random rand = new Random(0);
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 1; i < 1000; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(50);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 1 + rand.nextInt(5);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}
Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
for (int i = 1; i < 100; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(500);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 2 + rand.nextInt(5);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}
Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
for (int i = 1; i < 10; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(100000);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 20000 + rand.nextInt(100000);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}
Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
}
private static void fillBuckets(HyperLogLogCollector collector, byte startOffset, byte endOffset)
{
byte offset = startOffset;
while (offset <= endOffset) {
// fill buckets to shift registerOffset
for (short bucket = 0; bucket < 2048; ++bucket) {
collector.add(bucket, offset);
}
offset++;
}
}
// Provides a nice printout of error rates as a function of cardinality
//@Test
public void showErrorRate() throws Exception
{
HashFunction fn = Hashing.murmur3_128();
Random random = new Random();
double error = 0.0d;
int count = 0;
final int[] valsToCheck = {
10, 20, 50, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 1000000, 2000000, 10000000, Integer.MAX_VALUE
};
for (int numThings : valsToCheck) {
long startTime = System.currentTimeMillis();
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < numThings; ++i) {
if (i != 0 && i % 100000000 == 0) {
++count;
error = computeError(error, count, i, startTime, collector);
}
collector.add(fn.hashLong(random.nextLong()).asBytes());
}
++count;
error = computeError(error, count, numThings, startTime, collector);
}
}
private double computeError(double error, int count, int numThings, long startTime, HyperLogLogCollector collector)
{
final double estimatedValue = collector.estimateCardinality();
final double errorThisTime = Math.abs((double) numThings - estimatedValue) / numThings;
error += errorThisTime;
System.out.printf(
"%,d ==? %,f in %,d millis. actual error[%,f%%], avg. error [%,f%%]%n",
numThings,
estimatedValue,
System.currentTimeMillis() - startTime,
100 * errorThisTime,
(error / count) * 100
);
return error;
}
}

View File

@ -0,0 +1,54 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.google.common.collect.ImmutableMap;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.junit.Assert;
import org.junit.Test;
import java.util.Random;
/**
*/
public class HyperUniqueFinalizingPostAggregatorTest
{
private final HashFunction fn = Hashing.murmur3_128();
@Test
public void testCompute() throws Exception
{
Random random = new Random(0l);
HyperUniqueFinalizingPostAggregator postAggregator = new HyperUniqueFinalizingPostAggregator(
"uniques"
);
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < 100; ++i) {
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
collector.add(hashedVal);
}
double cardinality = (Double) postAggregator.compute(ImmutableMap.<String, Object>of("uniques", collector));
Assert.assertTrue(cardinality == 99.37233005831612);
}
}

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import junit.framework.Assert;
import org.junit.Test;
public class HyperUniquesAggregatorFactoryTest
{
final static HyperUniquesAggregatorFactory aggregatorFactory = new HyperUniquesAggregatorFactory(
"hyperUnique",
"uniques"
);
final static String V0_BASE64 = "AAYbEyQwFyQVASMCVFEQQgEQIxIhM4ISAQMhUkICEDFDIBMhMgFQFAFAMjAAEhEREyVAEiUBAhIjISATMCECMiERIRIiVRFRAyIAEgFCQSMEJAITATAAEAMQgCEBEjQiAyUTAyEQASJyAGURAAISAwISATETQhAREBYDIVIlFTASAzJgERIgRCcmUyAwNAMyEJMjIhQXQhEWECABQDETATEREjIRAgEyIiMxMBQiAkBBMDYAMEQQACMzMhIkMTQSkYIRABIBADMBAhIEISAENkEBQDAxETMAIEEwEzQiQSEVQSFBBAQDICIiAVIAMTAQIQYBIRABADMDEzEAQSMkEiAYFBAQI0AmECEyQSARRTIVMhEkMiKAMCUBxUghAkIBI3EmMAQiACEAJDJCAAADOzESEDBCRjMgEUQQETQwEWIhA6MlAiAAZDI1AgEIIDUyFDIHMQEEAwIRBRABBStCZCQhAgJSMQIiQEEURTBmM1MxACIAETGhMgQnBRICNiIREyIUNAEAAkABAwQSEBJBIhIhIRERAiIRACUhEUAVMkQGEVMjECYjACBwEQQSIRIgAAEyExQUFSEAIBJCIDIDYTAgMiNBIUADUiETADMoFEADETMCIwUEQkIAESMSIzIABDERIXEhIiACQgUSEgJiQCAUARIRAREDQiEUAkQgAgQiIEAzIxRCARIgBAAVAzMAECEwE0Qh8gAAASEhEiAiMhUxcRImIVABATYyUBAwIoE1QhRDIiYBIBEBEiQSQyERAAADMAARAEACFYUwQSQBIRIgURITARFSEzEHEBACOTMREBIAMjIgEhU0cxEQIRIhIi1wEgMRUBEgMQIRAnAVASURMHQBAiEyBSAAEBQTAWQ5EQA0IUMSISAUEiASIjIhMhMFJBBSEjEAECEwACASEQFBAjARITEQIgYTEKEAeAAiMkEyARowARFBAicRISIBIxAQAgEBARMCIRQgMSIVIAkjMxIAIEMyADASMgFRIjEyKjEjBBIEQCUAARYBEQMxMCIBACNCACRCMlEzUUAAUDM1MhAjEgAxAAISAVFQECAhQAMBMhEzEgASNxAhFRIxECMRJBQAERAToBgQMhJSRQFAEhAwMiIhMQAwAgQiBQJiIGMQQhEiQxR1MiAjIAIEEiAkARECEzQlMjECIRATBgIhEBQAIQAEATEjBCMwAgMBMhAhIyFBIxQAARI1AAEABCIDFBIRUzMBIgAgEiARQCASMQQDQCFBAQAUJwMUElAyIAIRBSIRITICEAIxMAEUBEYTcBMBEEIxMREwIRIDAGIAEgYxBAEANCAhBAI2UhIiIgIRABIEVRAwNEIQERQgEFMhFCQSIAEhQDMTEQMiAjJyEQ==";
@Test
public void testDeserializeV0() throws Exception
{
Object v0 = aggregatorFactory.deserialize(V0_BASE64);
Assert.assertEquals("deserialized value is HLLCV0", HLLCV0.class, v0.getClass());
}
@Test
public void testCombineStartValueV0() throws Exception
{
Object combined = aggregatorFactory.getAggregatorStartValue();
aggregatorFactory.combine(combined, aggregatorFactory.deserialize(V0_BASE64));
}
}

View File

@ -85,7 +85,8 @@ public class TimeseriesQueryRunnerTest
.aggregators(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexDoubleSum
QueryRunnerTestHelper.indexDoubleSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
@ -128,6 +129,11 @@ public class TimeseriesQueryRunnerTest
value.getDoubleMetric("addRowsIndexConstant"),
0.0
);
Assert.assertEquals(
value.getDoubleMetric("uniques"),
QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0.0d : 9.0d,
0.02
);
expectedEarliest = gran.toDateTime(gran.next(expectedEarliest.getMillis()));
++count;
@ -182,7 +188,12 @@ public class TimeseriesQueryRunnerTest
.granularity(QueryRunnerTestHelper.dayGran)
.filters(QueryRunnerTestHelper.providerDimension, "upfront")
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(Arrays.<AggregatorFactory>asList(QueryRunnerTestHelper.rowsCount))
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.qualityUniques
)
)
.build();
Assert.assertEquals(
@ -215,6 +226,14 @@ public class TimeseriesQueryRunnerTest
QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0L : 2L,
value.getLongMetric("rows").longValue()
);
Assert.assertEquals(
result.toString(),
QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0.0d : 2.0d,
value.getDoubleMetric(
"uniques"
),
0.01
);
expectedEarliest = gran.toDateTime(gran.next(expectedEarliest.getMillis()));
}
@ -233,7 +252,8 @@ public class TimeseriesQueryRunnerTest
new LongSumAggregatorFactory(
"idx",
"index"
)
),
QueryRunnerTestHelper.qualityUniques
)
)
.build();
@ -242,13 +262,13 @@ public class TimeseriesQueryRunnerTest
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6619L)
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6619L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
)
);
@ -327,7 +347,8 @@ public class TimeseriesQueryRunnerTest
new LongSumAggregatorFactory(
"idx",
"index"
)
),
QueryRunnerTestHelper.qualityUniques
)
)
.build();
@ -336,7 +357,7 @@ public class TimeseriesQueryRunnerTest
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
)
);
@ -363,7 +384,8 @@ public class TimeseriesQueryRunnerTest
new LongSumAggregatorFactory(
"idx",
"index"
)
),
QueryRunnerTestHelper.qualityUniques
)
)
.build();
@ -372,7 +394,7 @@ public class TimeseriesQueryRunnerTest
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
)
);
@ -457,7 +479,8 @@ public class TimeseriesQueryRunnerTest
new LongSumAggregatorFactory(
"idx",
"index"
)
),
QueryRunnerTestHelper.qualityUniques
)
)
.build();
@ -466,7 +489,7 @@ public class TimeseriesQueryRunnerTest
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
)
);
@ -494,7 +517,8 @@ public class TimeseriesQueryRunnerTest
new LongSumAggregatorFactory(
"idx",
"index"
)
),
QueryRunnerTestHelper.qualityUniques
)
)
.build();
@ -503,7 +527,7 @@ public class TimeseriesQueryRunnerTest
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
)
);
@ -561,7 +585,8 @@ public class TimeseriesQueryRunnerTest
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum
QueryRunnerTestHelper.indexLongSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
@ -574,7 +599,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 13L,
"index", 6619L,
"addRowsIndexConstant", 6633.0
"addRowsIndexConstant", 6633.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
),
@ -584,7 +610,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 13L,
"index", 5827L,
"addRowsIndexConstant", 5841.0
"addRowsIndexConstant", 5841.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
)
@ -608,7 +635,8 @@ public class TimeseriesQueryRunnerTest
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum
QueryRunnerTestHelper.indexLongSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
@ -621,7 +649,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 11L,
"index", 3783L,
"addRowsIndexConstant", 3795.0
"addRowsIndexConstant", 3795.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
),
@ -631,7 +660,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 11L,
"index", 3313L,
"addRowsIndexConstant", 3325.0
"addRowsIndexConstant", 3325.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
)
@ -655,7 +685,8 @@ public class TimeseriesQueryRunnerTest
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum
QueryRunnerTestHelper.indexLongSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
@ -668,7 +699,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 9L,
"index", 1102L,
"addRowsIndexConstant", 1112.0
"addRowsIndexConstant", 1112.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
),
@ -678,7 +710,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 9L,
"index", 1120L,
"addRowsIndexConstant", 1130.0
"addRowsIndexConstant", 1130.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
)
@ -702,7 +735,8 @@ public class TimeseriesQueryRunnerTest
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum
QueryRunnerTestHelper.indexLongSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
@ -715,7 +749,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 2681L,
"addRowsIndexConstant", 2684.0
"addRowsIndexConstant", 2684.0,
"uniques", QueryRunnerTestHelper.UNIQUES_2
)
)
),
@ -725,7 +760,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 2193L,
"addRowsIndexConstant", 2196.0
"addRowsIndexConstant", 2196.0,
"uniques", QueryRunnerTestHelper.UNIQUES_2
)
)
)
@ -749,7 +785,8 @@ public class TimeseriesQueryRunnerTest
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum
QueryRunnerTestHelper.indexLongSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
@ -762,7 +799,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 2836L,
"addRowsIndexConstant", 2839.0
"addRowsIndexConstant", 2839.0,
"uniques", QueryRunnerTestHelper.UNIQUES_2
)
)
),
@ -772,7 +810,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 2514L,
"addRowsIndexConstant", 2517.0
"addRowsIndexConstant", 2517.0,
"uniques", QueryRunnerTestHelper.UNIQUES_2
)
)
)
@ -818,7 +857,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 254.4554443359375D,
"addRowsIndexConstant", 257.4554443359375D
"addRowsIndexConstant", 257.4554443359375D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
)
)
),
@ -828,7 +868,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 260.4129638671875D,
"addRowsIndexConstant", 263.4129638671875D
"addRowsIndexConstant", 263.4129638671875D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
)
)
)
@ -874,7 +915,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 1L,
"index", new Float(135.885094).doubleValue(),
"addRowsIndexConstant", new Float(137.885094).doubleValue()
"addRowsIndexConstant", new Float(137.885094).doubleValue(),
"uniques", QueryRunnerTestHelper.UNIQUES_1
)
)
),
@ -884,7 +926,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 1L,
"index", new Float(147.425935).doubleValue(),
"addRowsIndexConstant", new Float(149.425935).doubleValue()
"addRowsIndexConstant", new Float(149.425935).doubleValue(),
"uniques", QueryRunnerTestHelper.UNIQUES_1
)
)
)
@ -930,7 +973,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 1L,
"index", new Float(118.570340).doubleValue(),
"addRowsIndexConstant", new Float(120.570340).doubleValue()
"addRowsIndexConstant", new Float(120.570340).doubleValue(),
"uniques", QueryRunnerTestHelper.UNIQUES_1
)
)
),
@ -940,7 +984,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 1L,
"index", new Float(112.987027).doubleValue(),
"addRowsIndexConstant", new Float(114.987027).doubleValue()
"addRowsIndexConstant", new Float(114.987027).doubleValue(),
"uniques", QueryRunnerTestHelper.UNIQUES_1
)
)
)
@ -970,7 +1015,8 @@ public class TimeseriesQueryRunnerTest
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum
QueryRunnerTestHelper.indexLongSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
@ -983,7 +1029,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 13L,
"index", 6619L,
"addRowsIndexConstant", 6633.0
"addRowsIndexConstant", 6633.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
),
@ -993,7 +1040,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 13L,
"index", 5827L,
"addRowsIndexConstant", 5841.0
"addRowsIndexConstant", 5841.0,
"uniques", QueryRunnerTestHelper.UNIQUES_9
)
)
)
@ -1043,7 +1091,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 254.4554443359375D,
"addRowsIndexConstant", 257.4554443359375D
"addRowsIndexConstant", 257.4554443359375D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
)
)
),
@ -1053,7 +1102,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 260.4129638671875D,
"addRowsIndexConstant", 263.4129638671875D
"addRowsIndexConstant", 263.4129638671875D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
)
)
)
@ -1085,7 +1135,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 0.0,
"addRowsIndexConstant", 1.0
"addRowsIndexConstant", 1.0,
"uniques", 0.0
)
)
),
@ -1095,7 +1146,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 0.0,
"addRowsIndexConstant", 1.0
"addRowsIndexConstant", 1.0,
"uniques", 0.0
)
)
)
@ -1127,7 +1179,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 0.0,
"addRowsIndexConstant", 1.0
"addRowsIndexConstant", 1.0,
"uniques", 0.0
)
)
),
@ -1137,7 +1190,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 0.0,
"addRowsIndexConstant", 1.0
"addRowsIndexConstant", 1.0,
"uniques", 0.0
)
)
)
@ -1183,7 +1237,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 0.0,
"addRowsIndexConstant", 1.0
"addRowsIndexConstant", 1.0,
"uniques", 0.0
)
)
),
@ -1193,7 +1248,8 @@ public class TimeseriesQueryRunnerTest
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 0.0,
"addRowsIndexConstant", 1.0
"addRowsIndexConstant", 1.0,
"uniques", 0.0
)
)
)

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.hash.Hashing;
import com.google.common.io.CharStreams;
import com.google.common.io.InputSupplier;
import com.google.common.io.LineProcessor;
@ -31,7 +32,10 @@ import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -52,14 +56,29 @@ public class TestIndex
private static QueryableIndex mmappedIndex = null;
private static QueryableIndex mergedRealtime = null;
public static final String[] COLUMNS = new String[]{"ts", "provider", "quALIty", "plAcEmEnT", "pLacementish", "iNdEx"};
public static final String[] COLUMNS = new String[]{
"ts",
"provider",
"quALIty",
"plAcEmEnT",
"pLacementish",
"iNdEx",
"qualiTy_Uniques"
};
public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"};
public static final String[] METRICS = new String[]{"iNdEx"};
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-04-16T00:00:00.000Z");
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0])
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
};
static {
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
}
public static IncrementalIndex getIncrementalTestIndex()
{
synchronized (log) {