mirror of https://github.com/apache/druid.git
port hyperunique to open source
This commit is contained in:
parent
3691d40240
commit
08138688e4
|
@ -82,3 +82,13 @@ All JavaScript functions must return numerical values.
|
|||
"fnReset" : "function() { return 10; }"
|
||||
}
|
||||
```
|
||||
|
||||
### Complex aggregators
|
||||
|
||||
#### `hyperUnique` aggregator
|
||||
|
||||
`hyperUnique` uses [Hyperloglog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to compute the estimated cardinality of a dimension.
|
||||
|
||||
```json
|
||||
{ "type" : "hyperUnique", "name" : <output_name>, "fieldName" : <metric_name> }
|
||||
```
|
||||
|
|
|
@ -64,6 +64,31 @@ Example JavaScript aggregator:
|
|||
"function": "function(delta, total) { return 100 * Math.abs(delta) / total; }"
|
||||
}
|
||||
```
|
||||
### `hyperUniqueCardinality` post-aggregator
|
||||
|
||||
The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations.
|
||||
|
||||
```json
|
||||
{ "type" : "hyperUniqueCardinality", "fieldName" : <the name field value of the hyperUnique aggregator>}
|
||||
```
|
||||
|
||||
It can be used in a sample calculation as so:
|
||||
|
||||
```json
|
||||
"aggregations" : [{
|
||||
{"type" : "count", "name" : "rows"},
|
||||
{"type" : "hyperUnique", "name" : "unique_users", "fieldName" : "uniques"}
|
||||
}],
|
||||
"postAggregations" : {
|
||||
"type" : "arithmetic",
|
||||
"name" : "average_users_per_row",
|
||||
"fn" : "/",
|
||||
"fields" : [
|
||||
{ "type" : "hyperUniqueCardinality", "fieldName" : "unique_users" },
|
||||
{ "type" : "fieldAccess", "name" : "rows", "fieldName" : "rows" }
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
### Example Usage
|
||||
|
@ -98,4 +123,4 @@ The format of the query JSON is as follows:
|
|||
...
|
||||
}
|
||||
|
||||
```
|
||||
```
|
|
@ -22,6 +22,7 @@ package io.druid.jackson;
|
|||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.common.hash.Hashing;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
|
@ -31,10 +32,14 @@ import io.druid.query.aggregation.LongSumAggregatorFactory;
|
|||
import io.druid.query.aggregation.MaxAggregatorFactory;
|
||||
import io.druid.query.aggregation.MinAggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import io.druid.query.aggregation.post.ConstantPostAggregator;
|
||||
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
import io.druid.query.aggregation.post.JavaScriptPostAggregator;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -44,28 +49,38 @@ public class AggregatorsModule extends SimpleModule
|
|||
{
|
||||
super("AggregatorFactories");
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
}
|
||||
|
||||
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
|
||||
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
|
||||
}
|
||||
|
||||
@JsonTypeInfo(use= JsonTypeInfo.Id.NAME, property="type")
|
||||
@JsonSubTypes(value={
|
||||
@JsonSubTypes.Type(name="count", value=CountAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name="longSum", value=LongSumAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name="doubleSum", value=DoubleSumAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name="max", value=MaxAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name="min", value=MinAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name="javascript", value=JavaScriptAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name="histogram", value=HistogramAggregatorFactory.class)
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "count", value = CountAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "longSum", value = LongSumAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "doubleSum", value = DoubleSumAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "max", value = MaxAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "min", value = MinAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class)
|
||||
})
|
||||
public static interface AggregatorFactoryMixin {}
|
||||
public static interface AggregatorFactoryMixin
|
||||
{
|
||||
}
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "arithmetic", value = ArithmeticPostAggregator.class),
|
||||
@JsonSubTypes.Type(name = "fieldAccess", value = FieldAccessPostAggregator.class),
|
||||
@JsonSubTypes.Type(name = "constant", value = ConstantPostAggregator.class),
|
||||
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class)
|
||||
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class),
|
||||
@JsonSubTypes.Type(name = "hyperUniqueCardinality", value = HyperUniqueFinalizingPostAggregator.class)
|
||||
})
|
||||
public static interface PostAggregatorMixin {}
|
||||
public static interface PostAggregatorMixin
|
||||
{
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,269 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ByteBitLookup
|
||||
{
|
||||
public static final byte[] lookup;
|
||||
|
||||
static {
|
||||
lookup = new byte[256];
|
||||
|
||||
lookup[0] = 0;
|
||||
lookup[1] = 1;
|
||||
lookup[2] = 2;
|
||||
lookup[3] = 1;
|
||||
lookup[4] = 3;
|
||||
lookup[5] = 1;
|
||||
lookup[6] = 2;
|
||||
lookup[7] = 1;
|
||||
lookup[8] = 4;
|
||||
lookup[9] = 1;
|
||||
lookup[10] = 2;
|
||||
lookup[11] = 1;
|
||||
lookup[12] = 3;
|
||||
lookup[13] = 1;
|
||||
lookup[14] = 2;
|
||||
lookup[15] = 1;
|
||||
lookup[16] = 5;
|
||||
lookup[17] = 1;
|
||||
lookup[18] = 2;
|
||||
lookup[19] = 1;
|
||||
lookup[20] = 3;
|
||||
lookup[21] = 1;
|
||||
lookup[22] = 2;
|
||||
lookup[23] = 1;
|
||||
lookup[24] = 4;
|
||||
lookup[25] = 1;
|
||||
lookup[26] = 2;
|
||||
lookup[27] = 1;
|
||||
lookup[28] = 3;
|
||||
lookup[29] = 1;
|
||||
lookup[30] = 2;
|
||||
lookup[31] = 1;
|
||||
lookup[32] = 6;
|
||||
lookup[33] = 1;
|
||||
lookup[34] = 2;
|
||||
lookup[35] = 1;
|
||||
lookup[36] = 3;
|
||||
lookup[37] = 1;
|
||||
lookup[38] = 2;
|
||||
lookup[39] = 1;
|
||||
lookup[40] = 4;
|
||||
lookup[41] = 1;
|
||||
lookup[42] = 2;
|
||||
lookup[43] = 1;
|
||||
lookup[44] = 3;
|
||||
lookup[45] = 1;
|
||||
lookup[46] = 2;
|
||||
lookup[47] = 1;
|
||||
lookup[48] = 5;
|
||||
lookup[49] = 1;
|
||||
lookup[50] = 2;
|
||||
lookup[51] = 1;
|
||||
lookup[52] = 3;
|
||||
lookup[53] = 1;
|
||||
lookup[54] = 2;
|
||||
lookup[55] = 1;
|
||||
lookup[56] = 4;
|
||||
lookup[57] = 1;
|
||||
lookup[58] = 2;
|
||||
lookup[59] = 1;
|
||||
lookup[60] = 3;
|
||||
lookup[61] = 1;
|
||||
lookup[62] = 2;
|
||||
lookup[63] = 1;
|
||||
lookup[64] = 7;
|
||||
lookup[65] = 1;
|
||||
lookup[66] = 2;
|
||||
lookup[67] = 1;
|
||||
lookup[68] = 3;
|
||||
lookup[69] = 1;
|
||||
lookup[70] = 2;
|
||||
lookup[71] = 1;
|
||||
lookup[72] = 4;
|
||||
lookup[73] = 1;
|
||||
lookup[74] = 2;
|
||||
lookup[75] = 1;
|
||||
lookup[76] = 3;
|
||||
lookup[77] = 1;
|
||||
lookup[78] = 2;
|
||||
lookup[79] = 1;
|
||||
lookup[80] = 5;
|
||||
lookup[81] = 1;
|
||||
lookup[82] = 2;
|
||||
lookup[83] = 1;
|
||||
lookup[84] = 3;
|
||||
lookup[85] = 1;
|
||||
lookup[86] = 2;
|
||||
lookup[87] = 1;
|
||||
lookup[88] = 4;
|
||||
lookup[89] = 1;
|
||||
lookup[90] = 2;
|
||||
lookup[91] = 1;
|
||||
lookup[92] = 3;
|
||||
lookup[93] = 1;
|
||||
lookup[94] = 2;
|
||||
lookup[95] = 1;
|
||||
lookup[96] = 6;
|
||||
lookup[97] = 1;
|
||||
lookup[98] = 2;
|
||||
lookup[99] = 1;
|
||||
lookup[100] = 3;
|
||||
lookup[101] = 1;
|
||||
lookup[102] = 2;
|
||||
lookup[103] = 1;
|
||||
lookup[104] = 4;
|
||||
lookup[105] = 1;
|
||||
lookup[106] = 2;
|
||||
lookup[107] = 1;
|
||||
lookup[108] = 3;
|
||||
lookup[109] = 1;
|
||||
lookup[110] = 2;
|
||||
lookup[111] = 1;
|
||||
lookup[112] = 5;
|
||||
lookup[113] = 1;
|
||||
lookup[114] = 2;
|
||||
lookup[115] = 1;
|
||||
lookup[116] = 3;
|
||||
lookup[117] = 1;
|
||||
lookup[118] = 2;
|
||||
lookup[119] = 1;
|
||||
lookup[120] = 4;
|
||||
lookup[121] = 1;
|
||||
lookup[122] = 2;
|
||||
lookup[123] = 1;
|
||||
lookup[124] = 3;
|
||||
lookup[125] = 1;
|
||||
lookup[126] = 2;
|
||||
lookup[127] = 1;
|
||||
lookup[128] = 8;
|
||||
lookup[129] = 1;
|
||||
lookup[130] = 2;
|
||||
lookup[131] = 1;
|
||||
lookup[132] = 3;
|
||||
lookup[133] = 1;
|
||||
lookup[134] = 2;
|
||||
lookup[135] = 1;
|
||||
lookup[136] = 4;
|
||||
lookup[137] = 1;
|
||||
lookup[138] = 2;
|
||||
lookup[139] = 1;
|
||||
lookup[140] = 3;
|
||||
lookup[141] = 1;
|
||||
lookup[142] = 2;
|
||||
lookup[143] = 1;
|
||||
lookup[144] = 5;
|
||||
lookup[145] = 1;
|
||||
lookup[146] = 2;
|
||||
lookup[147] = 1;
|
||||
lookup[148] = 3;
|
||||
lookup[149] = 1;
|
||||
lookup[150] = 2;
|
||||
lookup[151] = 1;
|
||||
lookup[152] = 4;
|
||||
lookup[153] = 1;
|
||||
lookup[154] = 2;
|
||||
lookup[155] = 1;
|
||||
lookup[156] = 3;
|
||||
lookup[157] = 1;
|
||||
lookup[158] = 2;
|
||||
lookup[159] = 1;
|
||||
lookup[160] = 6;
|
||||
lookup[161] = 1;
|
||||
lookup[162] = 2;
|
||||
lookup[163] = 1;
|
||||
lookup[164] = 3;
|
||||
lookup[165] = 1;
|
||||
lookup[166] = 2;
|
||||
lookup[167] = 1;
|
||||
lookup[168] = 4;
|
||||
lookup[169] = 1;
|
||||
lookup[170] = 2;
|
||||
lookup[171] = 1;
|
||||
lookup[172] = 3;
|
||||
lookup[173] = 1;
|
||||
lookup[174] = 2;
|
||||
lookup[175] = 1;
|
||||
lookup[176] = 5;
|
||||
lookup[177] = 1;
|
||||
lookup[178] = 2;
|
||||
lookup[179] = 1;
|
||||
lookup[180] = 3;
|
||||
lookup[181] = 1;
|
||||
lookup[182] = 2;
|
||||
lookup[183] = 1;
|
||||
lookup[184] = 4;
|
||||
lookup[185] = 1;
|
||||
lookup[186] = 2;
|
||||
lookup[187] = 1;
|
||||
lookup[188] = 3;
|
||||
lookup[189] = 1;
|
||||
lookup[190] = 2;
|
||||
lookup[191] = 1;
|
||||
lookup[192] = 7;
|
||||
lookup[193] = 1;
|
||||
lookup[194] = 2;
|
||||
lookup[195] = 1;
|
||||
lookup[196] = 3;
|
||||
lookup[197] = 1;
|
||||
lookup[198] = 2;
|
||||
lookup[199] = 1;
|
||||
lookup[200] = 4;
|
||||
lookup[201] = 1;
|
||||
lookup[202] = 2;
|
||||
lookup[203] = 1;
|
||||
lookup[204] = 3;
|
||||
lookup[205] = 1;
|
||||
lookup[206] = 2;
|
||||
lookup[207] = 1;
|
||||
lookup[208] = 5;
|
||||
lookup[209] = 1;
|
||||
lookup[210] = 2;
|
||||
lookup[211] = 1;
|
||||
lookup[212] = 3;
|
||||
lookup[213] = 1;
|
||||
lookup[214] = 2;
|
||||
lookup[215] = 1;
|
||||
lookup[216] = 4;
|
||||
lookup[217] = 1;
|
||||
lookup[218] = 2;
|
||||
lookup[219] = 1;
|
||||
lookup[220] = 3;
|
||||
lookup[221] = 1;
|
||||
lookup[222] = 2;
|
||||
lookup[223] = 1;
|
||||
lookup[224] = 6;
|
||||
lookup[225] = 1;
|
||||
lookup[226] = 2;
|
||||
lookup[227] = 1;
|
||||
lookup[228] = 3;
|
||||
lookup[229] = 1;
|
||||
lookup[230] = 2;
|
||||
lookup[231] = 1;
|
||||
lookup[232] = 4;
|
||||
lookup[233] = 1;
|
||||
lookup[234] = 2;
|
||||
lookup[235] = 1;
|
||||
lookup[236] = 3;
|
||||
lookup[237] = 1;
|
||||
lookup[238] = 2;
|
||||
lookup[239] = 1;
|
||||
lookup[240] = 5;
|
||||
lookup[241] = 1;
|
||||
lookup[242] = 2;
|
||||
lookup[243] = 1;
|
||||
lookup[244] = 3;
|
||||
lookup[245] = 1;
|
||||
lookup[246] = 2;
|
||||
lookup[247] = 1;
|
||||
lookup[248] = 4;
|
||||
lookup[249] = 1;
|
||||
lookup[250] = 2;
|
||||
lookup[251] = 1;
|
||||
lookup[252] = 3;
|
||||
lookup[253] = 1;
|
||||
lookup[254] = 2;
|
||||
lookup[255] = 1;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HLLCV0 extends HyperLogLogCollector
|
||||
{
|
||||
/**
|
||||
* Header:
|
||||
* Byte 0: registerOffset
|
||||
* Byte 1-2: numNonZeroRegisters
|
||||
*/
|
||||
public static final int NUM_NON_ZERO_REGISTERS_BYTE = 1;
|
||||
public static final int HEADER_NUM_BYTES = 3;
|
||||
public static final int NUM_BYTES_FOR_DENSE_STORAGE = NUM_BYTES_FOR_BUCKETS + HEADER_NUM_BYTES;
|
||||
|
||||
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{0, 0, 0}).asReadOnlyBuffer();
|
||||
|
||||
public HLLCV0()
|
||||
{
|
||||
super(defaultStorageBuffer);
|
||||
}
|
||||
|
||||
public HLLCV0(ByteBuffer buffer)
|
||||
{
|
||||
super(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getVersion()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVersion(ByteBuffer buffer)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getRegisterOffset()
|
||||
{
|
||||
return getStorageBuffer().get(getInitPosition());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRegisterOffset(byte registerOffset)
|
||||
{
|
||||
getStorageBuffer().put(getInitPosition(), registerOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRegisterOffset(ByteBuffer buffer, byte registerOffset)
|
||||
{
|
||||
buffer.put(buffer.position(), registerOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getNumNonZeroRegisters()
|
||||
{
|
||||
return getStorageBuffer().getShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNumNonZeroRegisters(short numNonZeroRegisters)
|
||||
{
|
||||
getStorageBuffer().putShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters)
|
||||
{
|
||||
buffer.putShort(buffer.position() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getMaxOverflowValue()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxOverflowValue(byte value)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxOverflowValue(ByteBuffer buffer, byte value)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getMaxOverflowRegister()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxOverflowRegister(short register)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxOverflowRegister(ByteBuffer buffer, short register)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumHeaderBytes()
|
||||
{
|
||||
return HEADER_NUM_BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumBytesForDenseStorage()
|
||||
{
|
||||
return NUM_BYTES_FOR_DENSE_STORAGE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPayloadBytePosition()
|
||||
{
|
||||
return getInitPosition() + HEADER_NUM_BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPayloadBytePosition(ByteBuffer buffer)
|
||||
{
|
||||
return buffer.position() + HEADER_NUM_BYTES;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,145 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HLLCV1 extends HyperLogLogCollector
|
||||
{
|
||||
/**
|
||||
* Header:
|
||||
* Byte 0: version
|
||||
* Byte 1: registerOffset
|
||||
* Byte 2-3: numNonZeroRegisters
|
||||
* Byte 4: maxOverflowValue
|
||||
* Byte 5-6: maxOverflowRegister
|
||||
*/
|
||||
public static final byte VERSION = 0x1;
|
||||
public static final int REGISTER_OFFSET_BYTE = 1;
|
||||
public static final int NUM_NON_ZERO_REGISTERS_BYTE = 2;
|
||||
public static final int MAX_OVERFLOW_VALUE_BYTE = 4;
|
||||
public static final int MAX_OVERFLOW_REGISTER_BYTE = 5;
|
||||
public static final int HEADER_NUM_BYTES = 7;
|
||||
public static final int NUM_BYTES_FOR_DENSE_STORAGE = NUM_BYTES_FOR_BUCKETS + HEADER_NUM_BYTES;
|
||||
|
||||
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{VERSION, 0, 0, 0, 0, 0, 0})
|
||||
.asReadOnlyBuffer();
|
||||
|
||||
public HLLCV1()
|
||||
{
|
||||
super(defaultStorageBuffer);
|
||||
}
|
||||
|
||||
public HLLCV1(ByteBuffer buffer)
|
||||
{
|
||||
super(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getVersion()
|
||||
{
|
||||
return VERSION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVersion(ByteBuffer buffer)
|
||||
{
|
||||
buffer.put(buffer.position(), VERSION);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getRegisterOffset()
|
||||
{
|
||||
return getStorageBuffer().get(getInitPosition() + REGISTER_OFFSET_BYTE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRegisterOffset(byte registerOffset)
|
||||
{
|
||||
getStorageBuffer().put(getInitPosition() + REGISTER_OFFSET_BYTE, registerOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRegisterOffset(ByteBuffer buffer, byte registerOffset)
|
||||
{
|
||||
buffer.put(buffer.position() + REGISTER_OFFSET_BYTE, registerOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getNumNonZeroRegisters()
|
||||
{
|
||||
return getStorageBuffer().getShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNumNonZeroRegisters(short numNonZeroRegisters)
|
||||
{
|
||||
getStorageBuffer().putShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters)
|
||||
{
|
||||
buffer.putShort(buffer.position() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getMaxOverflowValue()
|
||||
{
|
||||
return getStorageBuffer().get(getInitPosition() + MAX_OVERFLOW_VALUE_BYTE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxOverflowValue(byte value)
|
||||
{
|
||||
getStorageBuffer().put(getInitPosition() + MAX_OVERFLOW_VALUE_BYTE, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxOverflowValue(ByteBuffer buffer, byte value)
|
||||
{
|
||||
buffer.put(buffer.position() + MAX_OVERFLOW_VALUE_BYTE, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getMaxOverflowRegister()
|
||||
{
|
||||
return getStorageBuffer().getShort(getInitPosition() + MAX_OVERFLOW_REGISTER_BYTE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxOverflowRegister(short register)
|
||||
{
|
||||
getStorageBuffer().putShort(getInitPosition() + MAX_OVERFLOW_REGISTER_BYTE, register);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxOverflowRegister(ByteBuffer buffer, short register)
|
||||
{
|
||||
buffer.putShort(buffer.position() + MAX_OVERFLOW_REGISTER_BYTE, register);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumHeaderBytes()
|
||||
{
|
||||
return HEADER_NUM_BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumBytesForDenseStorage()
|
||||
{
|
||||
return NUM_BYTES_FOR_DENSE_STORAGE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPayloadBytePosition()
|
||||
{
|
||||
return getInitPosition() + HEADER_NUM_BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPayloadBytePosition(ByteBuffer buffer)
|
||||
{
|
||||
return buffer.position() + HEADER_NUM_BYTES;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,654 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import com.google.common.primitives.UnsignedBytes;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Implements the HyperLogLog cardinality estimator described in:
|
||||
* <p/>
|
||||
* http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
|
||||
* <p/>
|
||||
* Run this code to see a simple indication of expected errors based on different m values:
|
||||
* <p/>
|
||||
* for (int i = 1; i < 20; ++i) {
|
||||
* System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i));
|
||||
* }
|
||||
* <p/>
|
||||
* This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that
|
||||
* only one thread is ever calling methods on it.
|
||||
* <p/>
|
||||
* If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior
|
||||
*/
|
||||
public abstract class HyperLogLogCollector implements Comparable<HyperLogLogCollector>
|
||||
{
|
||||
public static final int DENSE_THRESHOLD = 128;
|
||||
public static final int BITS_FOR_BUCKETS = 11;
|
||||
public static final int NUM_BUCKETS = 1 << BITS_FOR_BUCKETS;
|
||||
public static final int NUM_BYTES_FOR_BUCKETS = NUM_BUCKETS / 2;
|
||||
|
||||
private static final double TWO_TO_THE_SIXTY_FOUR = Math.pow(2, 64);
|
||||
private static final double ALPHA = 0.7213 / (1 + 1.079 / NUM_BUCKETS);
|
||||
|
||||
public static final double LOW_CORRECTION_THRESHOLD = (5 * NUM_BUCKETS) / 2.0d;
|
||||
public static final double HIGH_CORRECTION_THRESHOLD = TWO_TO_THE_SIXTY_FOUR / 30.0d;
|
||||
public static final double CORRECTION_PARAMETER = ALPHA * NUM_BUCKETS * NUM_BUCKETS;
|
||||
|
||||
private static final Logger log = new Logger(HyperLogLogCollector.class);
|
||||
private static final int bucketMask = 0x7ff;
|
||||
private static final int minBytesRequired = 10;
|
||||
private static final int bitsPerBucket = 4;
|
||||
private static final int range = (int) Math.pow(2, bitsPerBucket) - 1;
|
||||
|
||||
private final static double[][] minNumRegisterLookup = new double[64][256];
|
||||
|
||||
static {
|
||||
for (int registerOffset = 0; registerOffset < 64; ++registerOffset) {
|
||||
for (int register = 0; register < 256; ++register) {
|
||||
final int upper = ((register & 0xf0) >> 4) + registerOffset;
|
||||
final int lower = (register & 0x0f) + registerOffset;
|
||||
minNumRegisterLookup[registerOffset][register] = 1.0d / Math.pow(2, upper) + 1.0d / Math.pow(2, lower);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// we have to keep track of the number of zeroes in each of the two halves of the byte register (0, 1, or 2)
|
||||
private final static int[] numZeroLookup = new int[256];
|
||||
|
||||
static {
|
||||
for (int i = 0; i < numZeroLookup.length; ++i) {
|
||||
numZeroLookup[i] = (((i & 0xf0) == 0) ? 1 : 0) + (((i & 0x0f) == 0) ? 1 : 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Methods to build the latest HLLC
|
||||
public static HyperLogLogCollector makeLatestCollector()
|
||||
{
|
||||
return new HLLCV1();
|
||||
}
|
||||
|
||||
public static HyperLogLogCollector makeCollector(ByteBuffer buffer)
|
||||
{
|
||||
int remaining = buffer.remaining();
|
||||
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
|
||||
}
|
||||
|
||||
public static int getLatestNumBytesForDenseStorage()
|
||||
{
|
||||
return HLLCV1.NUM_BYTES_FOR_DENSE_STORAGE;
|
||||
}
|
||||
|
||||
public static byte[] makeEmptyVersionedByteArray()
|
||||
{
|
||||
byte[] arr = new byte[getLatestNumBytesForDenseStorage()];
|
||||
arr[0] = HLLCV1.VERSION;
|
||||
return arr;
|
||||
}
|
||||
|
||||
public static double applyCorrection(double e, int zeroCount)
|
||||
{
|
||||
e = CORRECTION_PARAMETER / e;
|
||||
|
||||
if (e <= LOW_CORRECTION_THRESHOLD) {
|
||||
return zeroCount == 0 ? e : NUM_BUCKETS * Math.log(NUM_BUCKETS / (double) zeroCount);
|
||||
}
|
||||
|
||||
if (e > HIGH_CORRECTION_THRESHOLD) {
|
||||
final double ratio = e / TWO_TO_THE_SIXTY_FOUR;
|
||||
if (ratio >= 1) {
|
||||
// handle very unlikely case that value is > 2^64
|
||||
return Double.MAX_VALUE;
|
||||
} else {
|
||||
return -TWO_TO_THE_SIXTY_FOUR * Math.log(1 - ratio);
|
||||
}
|
||||
}
|
||||
|
||||
return e;
|
||||
}
|
||||
|
||||
private static double estimateSparse(
|
||||
final ByteBuffer buf,
|
||||
final byte minNum,
|
||||
final byte overflowValue,
|
||||
final short overflowPosition,
|
||||
final boolean isUpperNibble
|
||||
)
|
||||
{
|
||||
final ByteBuffer copy = buf.asReadOnlyBuffer();
|
||||
double e = 0.0d;
|
||||
int zeroCount = NUM_BUCKETS - 2 * (buf.remaining() / 3);
|
||||
while (copy.hasRemaining()) {
|
||||
short position = copy.getShort();
|
||||
final int register = (int) copy.get() & 0xff;
|
||||
if (overflowValue != 0 && position == overflowPosition) {
|
||||
int upperNibble = ((register & 0xf0) >>> bitsPerBucket) + minNum;
|
||||
int lowerNibble = (register & 0x0f) + minNum;
|
||||
if (isUpperNibble) {
|
||||
upperNibble = Math.max(upperNibble, overflowValue);
|
||||
} else {
|
||||
lowerNibble = Math.max(lowerNibble, overflowValue);
|
||||
}
|
||||
e += 1.0d / Math.pow(2, upperNibble) + 1.0d / Math.pow(2, lowerNibble);
|
||||
zeroCount += (((upperNibble & 0xf0) == 0) ? 1 : 0) + (((lowerNibble & 0x0f) == 0) ? 1 : 0);
|
||||
} else {
|
||||
e += minNumRegisterLookup[minNum][register];
|
||||
zeroCount += numZeroLookup[register];
|
||||
}
|
||||
}
|
||||
|
||||
e += zeroCount;
|
||||
return applyCorrection(e, zeroCount);
|
||||
}
|
||||
|
||||
private static double estimateDense(
|
||||
final ByteBuffer buf,
|
||||
final byte minNum,
|
||||
final byte overflowValue,
|
||||
final short overflowPosition,
|
||||
final boolean isUpperNibble
|
||||
)
|
||||
{
|
||||
final ByteBuffer copy = buf.asReadOnlyBuffer();
|
||||
double e = 0.0d;
|
||||
int zeroCount = 0;
|
||||
int position = 0;
|
||||
while (copy.hasRemaining()) {
|
||||
final int register = (int) copy.get() & 0xff;
|
||||
if (overflowValue != 0 && position == overflowPosition) {
|
||||
int upperNibble = ((register & 0xf0) >>> bitsPerBucket) + minNum;
|
||||
int lowerNibble = (register & 0x0f) + minNum;
|
||||
if (isUpperNibble) {
|
||||
upperNibble = Math.max(upperNibble, overflowValue);
|
||||
} else {
|
||||
lowerNibble = Math.max(lowerNibble, overflowValue);
|
||||
}
|
||||
e += 1.0d / Math.pow(2, upperNibble) + 1.0d / Math.pow(2, lowerNibble);
|
||||
zeroCount += (((upperNibble & 0xf0) == 0) ? 1 : 0) + (((lowerNibble & 0x0f) == 0) ? 1 : 0);
|
||||
} else {
|
||||
e += minNumRegisterLookup[minNum][register];
|
||||
zeroCount += numZeroLookup[register];
|
||||
}
|
||||
position++;
|
||||
}
|
||||
|
||||
return applyCorrection(e, zeroCount);
|
||||
}
|
||||
|
||||
private static boolean isSparse(ByteBuffer buffer)
|
||||
{
|
||||
return buffer.remaining() != NUM_BYTES_FOR_BUCKETS;
|
||||
}
|
||||
|
||||
private volatile ByteBuffer storageBuffer;
|
||||
private volatile int initPosition;
|
||||
private volatile Double estimatedCardinality;
|
||||
|
||||
public HyperLogLogCollector(ByteBuffer byteBuffer)
|
||||
{
|
||||
storageBuffer = byteBuffer.duplicate();
|
||||
initPosition = byteBuffer.position();
|
||||
estimatedCardinality = null;
|
||||
}
|
||||
|
||||
public abstract byte getVersion();
|
||||
|
||||
public abstract void setVersion(ByteBuffer buffer);
|
||||
|
||||
public abstract byte getRegisterOffset();
|
||||
|
||||
public abstract void setRegisterOffset(byte registerOffset);
|
||||
|
||||
public abstract void setRegisterOffset(ByteBuffer buffer, byte registerOffset);
|
||||
|
||||
public abstract short getNumNonZeroRegisters();
|
||||
|
||||
public abstract void setNumNonZeroRegisters(short numNonZeroRegisters);
|
||||
|
||||
public abstract void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters);
|
||||
|
||||
public abstract byte getMaxOverflowValue();
|
||||
|
||||
public abstract void setMaxOverflowValue(byte value);
|
||||
|
||||
public abstract void setMaxOverflowValue(ByteBuffer buffer, byte value);
|
||||
|
||||
public abstract short getMaxOverflowRegister();
|
||||
|
||||
public abstract void setMaxOverflowRegister(short register);
|
||||
|
||||
public abstract void setMaxOverflowRegister(ByteBuffer buffer, short register);
|
||||
|
||||
public abstract int getNumHeaderBytes();
|
||||
|
||||
public abstract int getNumBytesForDenseStorage();
|
||||
|
||||
public abstract int getPayloadBytePosition();
|
||||
|
||||
public abstract int getPayloadBytePosition(ByteBuffer buffer);
|
||||
|
||||
protected int getInitPosition()
|
||||
{
|
||||
return initPosition;
|
||||
}
|
||||
|
||||
protected ByteBuffer getStorageBuffer()
|
||||
{
|
||||
return storageBuffer;
|
||||
}
|
||||
|
||||
public void add(byte[] hashedValue)
|
||||
{
|
||||
if (hashedValue.length < minBytesRequired) {
|
||||
throw new IAE("Insufficient bytes, need[%d] got [%d]", minBytesRequired, hashedValue.length);
|
||||
}
|
||||
|
||||
estimatedCardinality = null;
|
||||
|
||||
final ByteBuffer buffer = ByteBuffer.wrap(hashedValue);
|
||||
|
||||
short bucket = (short) (buffer.getShort(hashedValue.length - 2) & bucketMask);
|
||||
|
||||
byte positionOf1 = 0;
|
||||
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
byte lookupVal = ByteBitLookup.lookup[UnsignedBytes.toInt(hashedValue[i])];
|
||||
switch (lookupVal) {
|
||||
case 0:
|
||||
positionOf1 += 8;
|
||||
continue;
|
||||
default:
|
||||
positionOf1 += lookupVal;
|
||||
i = 8;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
add(bucket, positionOf1);
|
||||
}
|
||||
|
||||
public void add(short bucket, byte positionOf1)
|
||||
{
|
||||
if (storageBuffer.isReadOnly()) {
|
||||
convertToMutableByteBuffer();
|
||||
}
|
||||
|
||||
byte registerOffset = getRegisterOffset();
|
||||
|
||||
// discard everything outside of the range we care about
|
||||
if (positionOf1 <= registerOffset) {
|
||||
return;
|
||||
} else if (positionOf1 > (registerOffset + range)) {
|
||||
byte currMax = getMaxOverflowValue();
|
||||
if (positionOf1 > currMax) {
|
||||
setMaxOverflowValue(positionOf1);
|
||||
setMaxOverflowRegister(bucket);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// whatever value we add must be stored in 4 bits
|
||||
short numNonZeroRegisters = addNibbleRegister(bucket, (byte) ((0xff & positionOf1) - registerOffset));
|
||||
setNumNonZeroRegisters(numNonZeroRegisters);
|
||||
if (numNonZeroRegisters == NUM_BUCKETS) {
|
||||
setRegisterOffset(++registerOffset);
|
||||
setNumNonZeroRegisters(decrementBuckets());
|
||||
}
|
||||
}
|
||||
|
||||
public HyperLogLogCollector fold(HyperLogLogCollector other)
|
||||
{
|
||||
if (other == null || other.storageBuffer.remaining() == 0) {
|
||||
return this;
|
||||
}
|
||||
|
||||
if (storageBuffer.isReadOnly()) {
|
||||
convertToMutableByteBuffer();
|
||||
}
|
||||
|
||||
estimatedCardinality = null;
|
||||
|
||||
if (getRegisterOffset() < other.getRegisterOffset()) {
|
||||
// "Swap" the buffers so that we are folding into the one with the higher offset
|
||||
ByteBuffer newStorage = ByteBuffer.allocate(other.storageBuffer.remaining());
|
||||
newStorage.put(other.storageBuffer.asReadOnlyBuffer());
|
||||
newStorage.clear();
|
||||
|
||||
other.storageBuffer = storageBuffer;
|
||||
other.initPosition = initPosition;
|
||||
storageBuffer = newStorage;
|
||||
initPosition = 0;
|
||||
}
|
||||
|
||||
ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
|
||||
byte otherOffset = other.getRegisterOffset();
|
||||
|
||||
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
|
||||
convertToDenseStorage();
|
||||
}
|
||||
|
||||
byte myOffset = getRegisterOffset();
|
||||
short numNonZero = getNumNonZeroRegisters();
|
||||
|
||||
int offsetDiff = myOffset - otherOffset;
|
||||
if (offsetDiff < 0) {
|
||||
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
|
||||
}
|
||||
|
||||
byte otherOverflowValue = other.getMaxOverflowValue();
|
||||
short otherOverflowRegister = other.getMaxOverflowRegister();
|
||||
add(otherOverflowRegister, otherOverflowValue);
|
||||
|
||||
int myPayloadStart = getPayloadBytePosition();
|
||||
otherBuffer.position(other.getPayloadBytePosition());
|
||||
if (isSparse(otherBuffer)) {
|
||||
while (otherBuffer.hasRemaining()) {
|
||||
short position = otherBuffer.getShort();
|
||||
int payloadStartPosition = position - other.getNumHeaderBytes();
|
||||
numNonZero += mergeAndStoreByteRegister(
|
||||
myPayloadStart + payloadStartPosition,
|
||||
offsetDiff,
|
||||
otherBuffer.get()
|
||||
);
|
||||
if (numNonZero == NUM_BUCKETS) {
|
||||
myOffset += 1;
|
||||
numNonZero = decrementBuckets();
|
||||
setRegisterOffset(myOffset);
|
||||
setNumNonZeroRegisters(numNonZero);
|
||||
|
||||
offsetDiff = myOffset - otherOffset;
|
||||
}
|
||||
}
|
||||
} else { // dense
|
||||
int position = getPayloadBytePosition();
|
||||
while (otherBuffer.hasRemaining()) {
|
||||
numNonZero += mergeAndStoreByteRegister(
|
||||
position,
|
||||
offsetDiff,
|
||||
otherBuffer.get()
|
||||
);
|
||||
if (numNonZero == NUM_BUCKETS) {
|
||||
myOffset += 1;
|
||||
numNonZero = decrementBuckets();
|
||||
setRegisterOffset(myOffset);
|
||||
setNumNonZeroRegisters(numNonZero);
|
||||
|
||||
offsetDiff = myOffset - otherOffset;
|
||||
}
|
||||
position++;
|
||||
}
|
||||
}
|
||||
|
||||
setRegisterOffset(myOffset);
|
||||
setNumNonZeroRegisters(numNonZero);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public HyperLogLogCollector fold(ByteBuffer buffer)
|
||||
{
|
||||
return fold(makeCollector(buffer));
|
||||
}
|
||||
|
||||
public ByteBuffer toByteBuffer()
|
||||
{
|
||||
short numNonZeroRegisters = getNumNonZeroRegisters();
|
||||
|
||||
// store sparsely
|
||||
if (storageBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) {
|
||||
ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]);
|
||||
setVersion(retVal);
|
||||
setRegisterOffset(retVal, getRegisterOffset());
|
||||
setNumNonZeroRegisters(retVal, numNonZeroRegisters);
|
||||
setMaxOverflowValue(retVal, getMaxOverflowValue());
|
||||
setMaxOverflowRegister(retVal, getMaxOverflowRegister());
|
||||
|
||||
int startPosition = getPayloadBytePosition();
|
||||
retVal.position(getPayloadBytePosition(retVal));
|
||||
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
|
||||
if (storageBuffer.get(i) != 0) {
|
||||
retVal.putShort((short) (0xffff & (i - initPosition)));
|
||||
retVal.put(storageBuffer.get(i));
|
||||
}
|
||||
}
|
||||
retVal.rewind();
|
||||
return retVal.asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
return storageBuffer.asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
@JsonValue
|
||||
public byte[] toByteArray()
|
||||
{
|
||||
final ByteBuffer buffer = toByteBuffer();
|
||||
byte[] theBytes = new byte[buffer.remaining()];
|
||||
buffer.get(theBytes);
|
||||
|
||||
return theBytes;
|
||||
}
|
||||
|
||||
public double estimateCardinality()
|
||||
{
|
||||
if (estimatedCardinality == null) {
|
||||
byte registerOffset = getRegisterOffset();
|
||||
byte overflowValue = getMaxOverflowValue();
|
||||
short overflowRegister = getMaxOverflowRegister();
|
||||
short overflowPosition = (short) (overflowRegister >>> 1);
|
||||
boolean isUpperNibble = ((overflowRegister & 0x1) == 0);
|
||||
|
||||
storageBuffer.position(getPayloadBytePosition());
|
||||
|
||||
if (isSparse(storageBuffer)) {
|
||||
estimatedCardinality = estimateSparse(
|
||||
storageBuffer,
|
||||
registerOffset,
|
||||
overflowValue,
|
||||
overflowPosition,
|
||||
isUpperNibble
|
||||
);
|
||||
} else {
|
||||
estimatedCardinality = estimateDense(
|
||||
storageBuffer,
|
||||
registerOffset,
|
||||
overflowValue,
|
||||
overflowPosition,
|
||||
isUpperNibble
|
||||
);
|
||||
}
|
||||
|
||||
storageBuffer.position(initPosition);
|
||||
}
|
||||
return estimatedCardinality;
|
||||
}
|
||||
|
||||
public double estimateByteBuffer(ByteBuffer buf)
|
||||
{
|
||||
return makeCollector(buf).estimateCardinality();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
HyperLogLogCollector collector = (HyperLogLogCollector) o;
|
||||
|
||||
if (storageBuffer != null ? !storageBuffer.equals(collector.storageBuffer) : collector.storageBuffer != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = storageBuffer != null ? storageBuffer.hashCode() : 0;
|
||||
result = 31 * result + initPosition;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "HyperLogLogCollector{" +
|
||||
"initPosition=" + initPosition +
|
||||
", version=" + getVersion() +
|
||||
", registerOffset=" + getRegisterOffset() +
|
||||
", numNonZeroRegisters=" + getNumNonZeroRegisters() +
|
||||
", maxOverflowValue=" + getMaxOverflowValue() +
|
||||
", maxOverflowRegister=" + getMaxOverflowRegister() +
|
||||
'}';
|
||||
}
|
||||
|
||||
private short decrementBuckets()
|
||||
{
|
||||
short count = 0;
|
||||
int startPosition = getPayloadBytePosition();
|
||||
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
|
||||
byte val = (byte) (storageBuffer.get(i) - 0x11);
|
||||
if ((val & 0xf0) != 0) {
|
||||
count++;
|
||||
}
|
||||
if ((val & 0x0f) != 0) {
|
||||
count++;
|
||||
}
|
||||
storageBuffer.put(i, val);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
private void convertToMutableByteBuffer()
|
||||
{
|
||||
ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
|
||||
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
|
||||
tmpBuffer.position(0);
|
||||
storageBuffer = tmpBuffer;
|
||||
initPosition = 0;
|
||||
}
|
||||
|
||||
private void convertToDenseStorage()
|
||||
{
|
||||
ByteBuffer tmpBuffer = ByteBuffer.wrap(new byte[getNumBytesForDenseStorage()]);
|
||||
// put header
|
||||
setVersion(tmpBuffer);
|
||||
setRegisterOffset(tmpBuffer, getRegisterOffset());
|
||||
setNumNonZeroRegisters(tmpBuffer, getNumNonZeroRegisters());
|
||||
setMaxOverflowValue(tmpBuffer, getMaxOverflowValue());
|
||||
setMaxOverflowRegister(tmpBuffer, getMaxOverflowRegister());
|
||||
|
||||
storageBuffer.position(getPayloadBytePosition());
|
||||
tmpBuffer.position(getPayloadBytePosition(tmpBuffer));
|
||||
// put payload
|
||||
while (storageBuffer.hasRemaining()) {
|
||||
tmpBuffer.put(storageBuffer.getShort(), storageBuffer.get());
|
||||
}
|
||||
tmpBuffer.rewind();
|
||||
storageBuffer = tmpBuffer;
|
||||
initPosition = 0;
|
||||
}
|
||||
|
||||
private short addNibbleRegister(short bucket, byte positionOf1)
|
||||
{
|
||||
short numNonZeroRegs = getNumNonZeroRegisters();
|
||||
final short position = (short) (bucket >> 1);
|
||||
final boolean isUpperNibble = ((bucket & 0x1) == 0);
|
||||
|
||||
byte shiftedPositionOf1 = (isUpperNibble) ? (byte) (positionOf1 << bitsPerBucket) : positionOf1;
|
||||
|
||||
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
|
||||
convertToDenseStorage();
|
||||
}
|
||||
|
||||
byte origVal = storageBuffer.get(getPayloadBytePosition() + position);
|
||||
byte newValueMask = (isUpperNibble) ? (byte) 0xf0 : (byte) 0x0f;
|
||||
byte originalValueMask = (byte) (newValueMask ^ 0xff);
|
||||
|
||||
// if something was at zero, we have to increase the numNonZeroRegisters
|
||||
if ((origVal & newValueMask) == 0 && shiftedPositionOf1 != 0) {
|
||||
numNonZeroRegs++;
|
||||
}
|
||||
|
||||
storageBuffer.put(
|
||||
getPayloadBytePosition() + position,
|
||||
(byte) (UnsignedBytes.max((byte) (origVal & newValueMask), shiftedPositionOf1) | (origVal & originalValueMask))
|
||||
);
|
||||
|
||||
return numNonZeroRegs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of registers that are no longer zero after the value was added
|
||||
*
|
||||
* @param position The position into the byte buffer, this position represents two "registers"
|
||||
* @param offsetDiff The difference in offset between the byteToAdd and the current HyperLogLogCollector
|
||||
* @param byteToAdd The byte to merge into the current HyperLogLogCollector
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
private int mergeAndStoreByteRegister(
|
||||
int position,
|
||||
int offsetDiff,
|
||||
byte byteToAdd
|
||||
)
|
||||
{
|
||||
if (byteToAdd == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
byte currVal = storageBuffer.get(position);
|
||||
|
||||
int upperNibble = currVal & 0xf0;
|
||||
int lowerNibble = currVal & 0x0f;
|
||||
|
||||
// subtract the differences so that the nibbles align
|
||||
int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << bitsPerBucket);
|
||||
int otherLower = (byteToAdd & 0x0f) - offsetDiff;
|
||||
|
||||
final int newUpper = Math.max(upperNibble, otherUpper);
|
||||
final int newLower = Math.max(lowerNibble, otherLower);
|
||||
|
||||
int numNoLongerZero = 0;
|
||||
if (upperNibble == 0 && newUpper > 0) {
|
||||
++numNoLongerZero;
|
||||
}
|
||||
|
||||
if (lowerNibble == 0 && newLower > 0) {
|
||||
++numNoLongerZero;
|
||||
}
|
||||
|
||||
storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff));
|
||||
|
||||
return numNoLongerZero;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(HyperLogLogCollector other)
|
||||
{
|
||||
final int lhsOffset = (int) this.getRegisterOffset() & 0xffff;
|
||||
final int rhsOffset = (int) other.getRegisterOffset() & 0xffff;
|
||||
|
||||
if (lhsOffset == rhsOffset) {
|
||||
final int lhsNumNonZero = (int) this.getNumNonZeroRegisters() & 0xff;
|
||||
final int rhsNumNonZero = (int) this.getNumNonZeroRegisters() & 0xff;
|
||||
int retVal = Double.compare(lhsNumNonZero, rhsNumNonZero);
|
||||
|
||||
if (retVal == 0) {
|
||||
retVal = Double.compare(this.estimateCardinality(), other.estimateCardinality());
|
||||
}
|
||||
|
||||
return retVal;
|
||||
} else {
|
||||
return Double.compare(lhsOffset, rhsOffset);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HyperUniqueFinalizingPostAggregator implements PostAggregator
|
||||
{
|
||||
private final String fieldName;
|
||||
|
||||
@JsonCreator
|
||||
public HyperUniqueFinalizingPostAggregator(
|
||||
@JsonProperty("fieldName") String fieldName
|
||||
)
|
||||
{
|
||||
this.fieldName = fieldName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getDependentFields()
|
||||
{
|
||||
return Sets.newHashSet(fieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object compute(Map<String, Object> combinedAggregators)
|
||||
{
|
||||
return HyperUniquesAggregatorFactory.estimateCardinality(combinedAggregators.get(fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty("fieldName")
|
||||
public String getName()
|
||||
{
|
||||
return fieldName;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HyperUniquesAggregator implements Aggregator
|
||||
{
|
||||
private final String name;
|
||||
private final ObjectColumnSelector selector;
|
||||
|
||||
private HyperLogLogCollector collector;
|
||||
|
||||
public HyperUniquesAggregator(
|
||||
String name,
|
||||
ObjectColumnSelector selector
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
|
||||
this.collector = HyperLogLogCollector.makeLatestCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
collector.fold((HyperLogLogCollector) selector.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset()
|
||||
{
|
||||
collector = HyperLogLogCollector.makeLatestCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return collector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new HyperUniquesAggregator(name, selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// no resources to cleanup
|
||||
}
|
||||
}
|
|
@ -0,0 +1,191 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.metamx.common.IAE;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.NoopAggregator;
|
||||
import io.druid.query.aggregation.NoopBufferAggregator;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HyperUniquesAggregatorFactory implements AggregatorFactory
|
||||
{
|
||||
public static Object estimateCardinality(Object object)
|
||||
{
|
||||
if (object == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return ((HyperLogLogCollector) object).estimateCardinality();
|
||||
}
|
||||
|
||||
private static final byte CACHE_TYPE_ID = 0x5;
|
||||
|
||||
private final String name;
|
||||
private final String fieldName;
|
||||
|
||||
@JsonCreator
|
||||
public HyperUniquesAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldName") String fieldName
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.fieldName = fieldName.toLowerCase();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
|
||||
|
||||
if (selector == null) {
|
||||
return new NoopAggregator(name);
|
||||
}
|
||||
|
||||
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
|
||||
return new HyperUniquesAggregator(name, selector);
|
||||
}
|
||||
|
||||
throw new IAE(
|
||||
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
|
||||
|
||||
if (selector == null) {
|
||||
return new NoopBufferAggregator();
|
||||
}
|
||||
|
||||
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
|
||||
return new HyperUniquesBufferAggregator(selector);
|
||||
}
|
||||
|
||||
throw new IAE(
|
||||
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
return new Comparator<HyperLogLogCollector>()
|
||||
{
|
||||
@Override
|
||||
public int compare(HyperLogLogCollector lhs, HyperLogLogCollector rhs)
|
||||
{
|
||||
return lhs.compareTo(rhs);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object combine(Object lhs, Object rhs)
|
||||
{
|
||||
if (rhs == null) {
|
||||
return lhs;
|
||||
}
|
||||
if (lhs == null) {
|
||||
return rhs;
|
||||
}
|
||||
return ((HyperLogLogCollector) lhs).fold((HyperLogLogCollector) rhs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory getCombiningFactory()
|
||||
{
|
||||
return new HyperUniquesAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
if (object instanceof byte[]) {
|
||||
return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object));
|
||||
} else if (object instanceof ByteBuffer) {
|
||||
return HyperLogLogCollector.makeCollector((ByteBuffer) object);
|
||||
} else if (object instanceof String) {
|
||||
return HyperLogLogCollector.makeCollector(
|
||||
ByteBuffer.wrap(Base64.decodeBase64(((String) object).getBytes(Charsets.UTF_8)))
|
||||
);
|
||||
}
|
||||
return object;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
public Object finalizeComputation(Object object)
|
||||
{
|
||||
return estimateCardinality(object);
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> requiredFields()
|
||||
{
|
||||
return Arrays.asList(fieldName);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFieldName()
|
||||
{
|
||||
return fieldName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTypeName()
|
||||
{
|
||||
return "hyperUnique";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxIntermediateSize()
|
||||
{
|
||||
return HyperLogLogCollector.getLatestNumBytesForDenseStorage();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return HyperLogLogCollector.makeLatestCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "HyperUniquesAggregatorFactory{" +
|
||||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HyperUniquesBufferAggregator implements BufferAggregator
|
||||
{
|
||||
private static final byte[] EMPTY_BYTES = HyperLogLogCollector.makeEmptyVersionedByteArray();
|
||||
private final ObjectColumnSelector selector;
|
||||
|
||||
public HyperUniquesBufferAggregator(
|
||||
ObjectColumnSelector selector
|
||||
)
|
||||
{
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
final ByteBuffer mutationBuffer = buf.duplicate();
|
||||
mutationBuffer.position(position);
|
||||
mutationBuffer.put(EMPTY_BYTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
HyperLogLogCollector collector = (HyperLogLogCollector) selector.get();
|
||||
|
||||
if (collector == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
HyperLogLogCollector.makeCollector(
|
||||
(ByteBuffer) buf.duplicate().position(position).limit(
|
||||
position
|
||||
+ HyperLogLogCollector.getLatestNumBytesForDenseStorage()
|
||||
)
|
||||
).fold(collector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
ByteBuffer dataCopyBuffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
ByteBuffer mutationBuffer = buf.duplicate();
|
||||
mutationBuffer.position(position);
|
||||
mutationBuffer.get(dataCopyBuffer.array());
|
||||
return HyperLogLogCollector.makeCollector(dataCopyBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// no resources to cleanup
|
||||
}
|
||||
}
|
|
@ -0,0 +1,129 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.segment.column.ColumnBuilder;
|
||||
import io.druid.segment.data.GenericIndexed;
|
||||
import io.druid.segment.data.ObjectStrategy;
|
||||
import io.druid.segment.serde.ColumnPartSerde;
|
||||
import io.druid.segment.serde.ComplexColumnPartSerde;
|
||||
import io.druid.segment.serde.ComplexColumnPartSupplier;
|
||||
import io.druid.segment.serde.ComplexMetricExtractor;
|
||||
import io.druid.segment.serde.ComplexMetricSerde;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HyperUniquesSerde extends ComplexMetricSerde
|
||||
{
|
||||
private static Ordering<HyperLogLogCollector> comparator = new Ordering<HyperLogLogCollector>()
|
||||
{
|
||||
@Override
|
||||
public int compare(
|
||||
HyperLogLogCollector arg1, HyperLogLogCollector arg2
|
||||
)
|
||||
{
|
||||
return arg1.toByteBuffer().compareTo(arg2.toByteBuffer());
|
||||
}
|
||||
}.nullsFirst();
|
||||
|
||||
private final HashFunction hashFn;
|
||||
|
||||
public HyperUniquesSerde(
|
||||
HashFunction hashFn
|
||||
)
|
||||
{
|
||||
this.hashFn = hashFn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTypeName()
|
||||
{
|
||||
return "hyperUnique";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ComplexMetricExtractor getExtractor()
|
||||
{
|
||||
return new ComplexMetricExtractor()
|
||||
{
|
||||
@Override
|
||||
public Class<HyperLogLogCollector> extractedClass()
|
||||
{
|
||||
return HyperLogLogCollector.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HyperLogLogCollector extractValue(InputRow inputRow, String metricName)
|
||||
{
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
List<String> dimValues = inputRow.getDimension(metricName);
|
||||
if (dimValues == null) {
|
||||
return collector;
|
||||
}
|
||||
|
||||
for (String dimensionValue : dimValues) {
|
||||
collector.add(hashFn.hashBytes(dimensionValue.getBytes(Charsets.UTF_8)).asBytes());
|
||||
}
|
||||
return collector;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnPartSerde deserializeColumn(
|
||||
ByteBuffer byteBuffer, ColumnBuilder columnBuilder
|
||||
)
|
||||
{
|
||||
final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy());
|
||||
|
||||
columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));
|
||||
|
||||
return new ComplexColumnPartSerde(column, getTypeName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectStrategy getObjectStrategy()
|
||||
{
|
||||
return new ObjectStrategy<HyperLogLogCollector>()
|
||||
{
|
||||
@Override
|
||||
public Class<? extends HyperLogLogCollector> getClazz()
|
||||
{
|
||||
return HyperLogLogCollector.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
|
||||
{
|
||||
buffer.limit(buffer.position() + numBytes);
|
||||
|
||||
int remaining = buffer.remaining();
|
||||
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBytes(HyperLogLogCollector collector)
|
||||
{
|
||||
if (collector == null) {
|
||||
return new byte[]{};
|
||||
}
|
||||
ByteBuffer val = collector.toByteBuffer();
|
||||
byte[] retVal = new byte[val.remaining()];
|
||||
val.asReadOnlyBuffer().get(retVal);
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(HyperLogLogCollector o1, HyperLogLogCollector o2)
|
||||
{
|
||||
return comparator.compare(o1, o2);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import io.druid.query.aggregation.post.ConstantPostAggregator;
|
||||
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
|
@ -48,7 +49,7 @@ import java.util.List;
|
|||
*/
|
||||
public class QueryRunnerTestHelper
|
||||
{
|
||||
public static final String segmentId= "testSegment";
|
||||
public static final String segmentId = "testSegment";
|
||||
public static final String dataSource = "testing";
|
||||
public static final QueryGranularity dayGran = QueryGranularity.DAY;
|
||||
public static final QueryGranularity allGran = QueryGranularity.ALL;
|
||||
|
@ -57,9 +58,15 @@ public class QueryRunnerTestHelper
|
|||
public static final String placementDimension = "placement";
|
||||
public static final String placementishDimension = "placementish";
|
||||
public static final String indexMetric = "index";
|
||||
public static final String uniqueMetric = "uniques";
|
||||
public static final String addRowsIndexConstantMetric = "addRowsIndexConstant";
|
||||
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
|
||||
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
|
||||
public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
|
||||
public static final HyperUniquesAggregatorFactory qualityUniques = new HyperUniquesAggregatorFactory(
|
||||
"uniques",
|
||||
"quality_uniques"
|
||||
);
|
||||
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
|
||||
public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
|
||||
public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
|
||||
|
@ -67,7 +74,15 @@ public class QueryRunnerTestHelper
|
|||
new ArithmeticPostAggregator(
|
||||
"addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
|
||||
);
|
||||
public static final List<AggregatorFactory> commonAggregators = Arrays.asList(rowsCount, indexDoubleSum);
|
||||
public static final List<AggregatorFactory> commonAggregators = Arrays.asList(
|
||||
rowsCount,
|
||||
indexDoubleSum,
|
||||
qualityUniques
|
||||
);
|
||||
|
||||
public static final double UNIQUES_9 = 9.019833517963864;
|
||||
public static final double UNIQUES_2 = 2.000977198748901d;
|
||||
public static final double UNIQUES_1 = 1.0002442201269182d;
|
||||
|
||||
public static final String[] expectedFullOnIndexValues = new String[]{
|
||||
"4500.0", "6077.949111938477", "4922.488838195801", "5726.140853881836", "4698.468170166016",
|
||||
|
|
|
@ -0,0 +1,774 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HyperLogLogCollectorTest
|
||||
{
|
||||
|
||||
private final HashFunction fn = Hashing.murmur3_128();
|
||||
private final Random random = new Random();
|
||||
|
||||
@Test
|
||||
public void testFolding() throws Exception
|
||||
{
|
||||
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
|
||||
for (int numThings : numValsToCheck) {
|
||||
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector oneHalf = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector otherHalf = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
for (int i = 0; i < numThings; ++i) {
|
||||
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
|
||||
|
||||
allCombined.add(hashedVal);
|
||||
if (i % 2 == 0) {
|
||||
oneHalf.add(hashedVal);
|
||||
} else {
|
||||
otherHalf.add(hashedVal);
|
||||
}
|
||||
}
|
||||
|
||||
HyperLogLogCollector folded = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
folded.fold(oneHalf);
|
||||
Assert.assertEquals(oneHalf, folded);
|
||||
Assert.assertEquals(oneHalf.estimateCardinality(), folded.estimateCardinality(), 0.0d);
|
||||
|
||||
folded.fold(otherHalf);
|
||||
Assert.assertEquals(allCombined, folded);
|
||||
Assert.assertEquals(allCombined.estimateCardinality(), folded.estimateCardinality(), 0.0d);
|
||||
}
|
||||
}
|
||||
|
||||
// @Test
|
||||
public void testHighCardinalityRollingFold() throws Exception
|
||||
{
|
||||
final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector();
|
||||
final HyperLogLogCollector simple = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
int count;
|
||||
MessageDigest md = MessageDigest.getInstance("SHA-1");
|
||||
HyperLogLogCollector tmp = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
for (count = 0; count < 5000000; ++count) {
|
||||
md.update(Integer.toString(count).getBytes());
|
||||
|
||||
byte[] hashed = fn.hashBytes(md.digest()).asBytes();
|
||||
|
||||
tmp.add(hashed);
|
||||
simple.add(hashed);
|
||||
|
||||
if (count % 100 == 0) {
|
||||
rolling.fold(tmp);
|
||||
tmp = HyperLogLogCollector.makeLatestCollector();
|
||||
}
|
||||
}
|
||||
|
||||
int n = count;
|
||||
|
||||
System.out.println("True cardinality " + n);
|
||||
System.out.println("Rolling buffer cardinality " + rolling.estimateCardinality());
|
||||
System.out.println("Simple buffer cardinality " + simple.estimateCardinality());
|
||||
System.out.println(
|
||||
String.format(
|
||||
"Rolling cardinality estimate off by %4.1f%%",
|
||||
100 * (1 - rolling.estimateCardinality() / n)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(n, simple.estimateCardinality(), n * 0.05);
|
||||
Assert.assertEquals(n, rolling.estimateCardinality(), n * 0.05);
|
||||
}
|
||||
|
||||
//@Test
|
||||
public void testHighCardinalityRollingFold2() throws Exception
|
||||
{
|
||||
final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector();
|
||||
int count;
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
for (count = 0; count < 5000000; ++count) {
|
||||
HyperLogLogCollector theCollector = HyperLogLogCollector.makeLatestCollector();
|
||||
theCollector.add(fn.hashLong(count).asBytes());
|
||||
rolling.fold(theCollector);
|
||||
}
|
||||
System.out.printf("testHighCardinalityRollingFold2 took %d ms%n", System.currentTimeMillis() - start);
|
||||
|
||||
int n = count;
|
||||
|
||||
System.out.println("True cardinality " + n);
|
||||
System.out.println("Rolling buffer cardinality " + rolling.estimateCardinality());
|
||||
System.out.println(
|
||||
String.format(
|
||||
"Rolling cardinality estimate off by %4.1f%%",
|
||||
100 * (1 - rolling.estimateCardinality() / n)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(n, rolling.estimateCardinality(), n * 0.05);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoldingByteBuffers() throws Exception
|
||||
{
|
||||
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
|
||||
for (int numThings : numValsToCheck) {
|
||||
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector oneHalf = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector otherHalf = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
for (int i = 0; i < numThings; ++i) {
|
||||
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
|
||||
|
||||
allCombined.add(hashedVal);
|
||||
if (i % 2 == 0) {
|
||||
oneHalf.add(hashedVal);
|
||||
} else {
|
||||
otherHalf.add(hashedVal);
|
||||
}
|
||||
}
|
||||
|
||||
HyperLogLogCollector folded = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
folded.fold(oneHalf.toByteBuffer());
|
||||
Assert.assertEquals(oneHalf, folded);
|
||||
Assert.assertEquals(oneHalf.estimateCardinality(), folded.estimateCardinality(), 0.0d);
|
||||
|
||||
folded.fold(otherHalf.toByteBuffer());
|
||||
Assert.assertEquals(allCombined, folded);
|
||||
Assert.assertEquals(allCombined.estimateCardinality(), folded.estimateCardinality(), 0.0d);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoldingReadOnlyByteBuffers() throws Exception
|
||||
{
|
||||
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
|
||||
for (int numThings : numValsToCheck) {
|
||||
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector oneHalf = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector otherHalf = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
for (int i = 0; i < numThings; ++i) {
|
||||
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
|
||||
|
||||
allCombined.add(hashedVal);
|
||||
if (i % 2 == 0) {
|
||||
oneHalf.add(hashedVal);
|
||||
} else {
|
||||
otherHalf.add(hashedVal);
|
||||
}
|
||||
}
|
||||
|
||||
HyperLogLogCollector folded = HyperLogLogCollector.makeCollector(
|
||||
ByteBuffer.wrap(HyperLogLogCollector.makeEmptyVersionedByteArray())
|
||||
.asReadOnlyBuffer()
|
||||
);
|
||||
|
||||
folded.fold(oneHalf.toByteBuffer());
|
||||
Assert.assertEquals(oneHalf, folded);
|
||||
Assert.assertEquals(oneHalf.estimateCardinality(), folded.estimateCardinality(), 0.0d);
|
||||
|
||||
folded.fold(otherHalf.toByteBuffer());
|
||||
Assert.assertEquals(allCombined, folded);
|
||||
Assert.assertEquals(allCombined.estimateCardinality(), folded.estimateCardinality(), 0.0d);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoldingReadOnlyByteBuffersWithArbitraryPosition() throws Exception
|
||||
{
|
||||
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
|
||||
for (int numThings : numValsToCheck) {
|
||||
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector oneHalf = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector otherHalf = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
for (int i = 0; i < numThings; ++i) {
|
||||
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
|
||||
|
||||
allCombined.add(hashedVal);
|
||||
if (i % 2 == 0) {
|
||||
oneHalf.add(hashedVal);
|
||||
} else {
|
||||
otherHalf.add(hashedVal);
|
||||
}
|
||||
}
|
||||
|
||||
HyperLogLogCollector folded = HyperLogLogCollector.makeCollector(
|
||||
shiftedBuffer(
|
||||
ByteBuffer.wrap(HyperLogLogCollector.makeEmptyVersionedByteArray())
|
||||
.asReadOnlyBuffer(),
|
||||
17
|
||||
)
|
||||
);
|
||||
|
||||
folded.fold(oneHalf.toByteBuffer());
|
||||
Assert.assertEquals(oneHalf, folded);
|
||||
Assert.assertEquals(oneHalf.estimateCardinality(), folded.estimateCardinality(), 0.0d);
|
||||
|
||||
folded.fold(otherHalf.toByteBuffer());
|
||||
Assert.assertEquals(allCombined, folded);
|
||||
Assert.assertEquals(allCombined.estimateCardinality(), folded.estimateCardinality(), 0.0d);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoldWithDifferentOffsets1() throws Exception
|
||||
{
|
||||
ByteBuffer biggerOffset = makeCollectorBuffer(1, (byte) 0x00, 0x11);
|
||||
ByteBuffer smallerOffset = makeCollectorBuffer(0, (byte) 0x20, 0x00);
|
||||
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
collector.fold(biggerOffset);
|
||||
collector.fold(smallerOffset);
|
||||
|
||||
ByteBuffer outBuffer = collector.toByteBuffer();
|
||||
|
||||
Assert.assertEquals(outBuffer.get(), collector.getVersion());
|
||||
Assert.assertEquals(outBuffer.get(), 1);
|
||||
Assert.assertEquals(outBuffer.getShort(), 2047);
|
||||
outBuffer.get();
|
||||
outBuffer.getShort();
|
||||
Assert.assertEquals(outBuffer.get(), 0x10);
|
||||
while (outBuffer.hasRemaining()) {
|
||||
Assert.assertEquals(outBuffer.get(), 0x11);
|
||||
}
|
||||
|
||||
collector = HyperLogLogCollector.makeLatestCollector();
|
||||
collector.fold(smallerOffset);
|
||||
collector.fold(biggerOffset);
|
||||
|
||||
outBuffer = collector.toByteBuffer();
|
||||
|
||||
Assert.assertEquals(outBuffer.get(), collector.getVersion());
|
||||
Assert.assertEquals(outBuffer.get(), 1);
|
||||
Assert.assertEquals(outBuffer.getShort(), 2047);
|
||||
Assert.assertEquals(outBuffer.get(), 0);
|
||||
Assert.assertEquals(outBuffer.getShort(), 0);
|
||||
Assert.assertEquals(outBuffer.get(), 0x10);
|
||||
while (outBuffer.hasRemaining()) {
|
||||
Assert.assertEquals(outBuffer.get(), 0x11);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoldWithArbitraryInitialPositions() throws Exception
|
||||
{
|
||||
ByteBuffer biggerOffset = shiftedBuffer(makeCollectorBuffer(1, (byte) 0x00, 0x11), 10);
|
||||
ByteBuffer smallerOffset = shiftedBuffer(makeCollectorBuffer(0, (byte) 0x20, 0x00), 15);
|
||||
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
collector.fold(biggerOffset);
|
||||
collector.fold(smallerOffset);
|
||||
|
||||
ByteBuffer outBuffer = collector.toByteBuffer();
|
||||
|
||||
Assert.assertEquals(outBuffer.get(), collector.getVersion());
|
||||
Assert.assertEquals(outBuffer.get(), 1);
|
||||
Assert.assertEquals(outBuffer.getShort(), 2047);
|
||||
outBuffer.get();
|
||||
outBuffer.getShort();
|
||||
Assert.assertEquals(outBuffer.get(), 0x10);
|
||||
while (outBuffer.hasRemaining()) {
|
||||
Assert.assertEquals(outBuffer.get(), 0x11);
|
||||
}
|
||||
|
||||
collector = HyperLogLogCollector.makeLatestCollector();
|
||||
collector.fold(smallerOffset);
|
||||
collector.fold(biggerOffset);
|
||||
|
||||
outBuffer = collector.toByteBuffer();
|
||||
|
||||
Assert.assertEquals(outBuffer.get(), collector.getVersion());
|
||||
Assert.assertEquals(outBuffer.get(), 1);
|
||||
Assert.assertEquals(outBuffer.getShort(), 2047);
|
||||
outBuffer.get();
|
||||
outBuffer.getShort();
|
||||
Assert.assertEquals(outBuffer.get(), 0x10);
|
||||
while (outBuffer.hasRemaining()) {
|
||||
Assert.assertEquals(outBuffer.get(), 0x11);
|
||||
}
|
||||
}
|
||||
|
||||
protected ByteBuffer shiftedBuffer(ByteBuffer buf, int offset)
|
||||
{
|
||||
ByteBuffer shifted = ByteBuffer.allocate(buf.remaining() + offset);
|
||||
shifted.position(offset);
|
||||
shifted.put(buf);
|
||||
shifted.position(offset);
|
||||
return shifted;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoldWithDifferentOffsets2() throws Exception
|
||||
{
|
||||
ByteBuffer biggerOffset = makeCollectorBuffer(1, (byte) 0x01, 0x11);
|
||||
ByteBuffer smallerOffset = makeCollectorBuffer(0, (byte) 0x20, 0x00);
|
||||
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
collector.fold(biggerOffset);
|
||||
collector.fold(smallerOffset);
|
||||
|
||||
ByteBuffer outBuffer = collector.toByteBuffer();
|
||||
|
||||
Assert.assertEquals(outBuffer.get(), collector.getVersion());
|
||||
Assert.assertEquals(outBuffer.get(), 2);
|
||||
Assert.assertEquals(outBuffer.getShort(), 0);
|
||||
outBuffer.get();
|
||||
outBuffer.getShort();
|
||||
Assert.assertFalse(outBuffer.hasRemaining());
|
||||
|
||||
collector = HyperLogLogCollector.makeLatestCollector();
|
||||
collector.fold(smallerOffset);
|
||||
collector.fold(biggerOffset);
|
||||
|
||||
outBuffer = collector.toByteBuffer();
|
||||
|
||||
Assert.assertEquals(outBuffer.get(), collector.getVersion());
|
||||
Assert.assertEquals(outBuffer.get(), 2);
|
||||
Assert.assertEquals(outBuffer.getShort(), 0);
|
||||
outBuffer.get();
|
||||
outBuffer.getShort();
|
||||
Assert.assertFalse(outBuffer.hasRemaining());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoldWithUpperNibbleTriggersOffsetChange() throws Exception
|
||||
{
|
||||
byte[] arr1 = new byte[HyperLogLogCollector.getLatestNumBytesForDenseStorage()];
|
||||
Arrays.fill(arr1, (byte) 0x11);
|
||||
ByteBuffer buffer1 = ByteBuffer.wrap(arr1);
|
||||
buffer1.put(0, HLLCV1.VERSION);
|
||||
buffer1.put(1, (byte) 0);
|
||||
buffer1.putShort(2, (short) (2047));
|
||||
buffer1.put(HLLCV1.HEADER_NUM_BYTES, (byte) 0x1);
|
||||
|
||||
byte[] arr2 = new byte[HyperLogLogCollector.getLatestNumBytesForDenseStorage()];
|
||||
Arrays.fill(arr2, (byte) 0x11);
|
||||
ByteBuffer buffer2 = ByteBuffer.wrap(arr2);
|
||||
buffer2.put(0, HLLCV1.VERSION);
|
||||
buffer2.put(1, (byte) 0);
|
||||
buffer2.putShort(2, (short) (2048));
|
||||
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer1);
|
||||
collector.fold(buffer2);
|
||||
|
||||
ByteBuffer outBuffer = collector.toByteBuffer();
|
||||
|
||||
Assert.assertEquals(outBuffer.get(), HLLCV1.VERSION);
|
||||
Assert.assertEquals(outBuffer.get(), 1);
|
||||
Assert.assertEquals(outBuffer.getShort(), 0);
|
||||
outBuffer.get();
|
||||
outBuffer.getShort();
|
||||
Assert.assertFalse(outBuffer.hasRemaining());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSparseFoldWithDifferentOffsets1() throws Exception
|
||||
{
|
||||
ByteBuffer biggerOffset = makeCollectorBuffer(1, new byte[]{0x11, 0x10}, 0x11);
|
||||
ByteBuffer sparse = HyperLogLogCollector.makeCollector(makeCollectorBuffer(0, new byte[]{0x00, 0x02}, 0x00))
|
||||
.toByteBuffer();
|
||||
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
collector.fold(biggerOffset);
|
||||
collector.fold(sparse);
|
||||
|
||||
ByteBuffer outBuffer = collector.toByteBuffer();
|
||||
|
||||
Assert.assertEquals(outBuffer.get(), collector.getVersion());
|
||||
Assert.assertEquals(outBuffer.get(), 2);
|
||||
Assert.assertEquals(outBuffer.getShort(), 0);
|
||||
Assert.assertEquals(outBuffer.get(), 0);
|
||||
Assert.assertEquals(outBuffer.getShort(), 0);
|
||||
Assert.assertFalse(outBuffer.hasRemaining());
|
||||
|
||||
collector = HyperLogLogCollector.makeLatestCollector();
|
||||
collector.fold(sparse);
|
||||
collector.fold(biggerOffset);
|
||||
|
||||
outBuffer = collector.toByteBuffer();
|
||||
|
||||
Assert.assertEquals(outBuffer.get(), collector.getVersion());
|
||||
Assert.assertEquals(outBuffer.get(), 2);
|
||||
Assert.assertEquals(outBuffer.getShort(), 0);
|
||||
Assert.assertEquals(outBuffer.get(), 0);
|
||||
Assert.assertEquals(outBuffer.getShort(), 0);
|
||||
Assert.assertFalse(outBuffer.hasRemaining());
|
||||
}
|
||||
|
||||
private ByteBuffer makeCollectorBuffer(int offset, byte initialBytes, int remainingBytes)
|
||||
{
|
||||
return makeCollectorBuffer(offset, new byte[]{initialBytes}, remainingBytes);
|
||||
}
|
||||
|
||||
private ByteBuffer makeCollectorBuffer(int offset, byte[] initialBytes, int remainingBytes)
|
||||
{
|
||||
short numNonZero = 0;
|
||||
for (byte initialByte : initialBytes) {
|
||||
numNonZero += computeNumNonZero(initialByte);
|
||||
}
|
||||
|
||||
final short numNonZeroInRemaining = computeNumNonZero((byte) remainingBytes);
|
||||
numNonZero += (HyperLogLogCollector.NUM_BYTES_FOR_BUCKETS - initialBytes.length) * numNonZeroInRemaining;
|
||||
|
||||
ByteBuffer biggerOffset = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
biggerOffset.put(HLLCV1.VERSION);
|
||||
biggerOffset.put((byte) offset);
|
||||
biggerOffset.putShort(numNonZero);
|
||||
biggerOffset.put((byte) 0);
|
||||
biggerOffset.putShort((short) 0);
|
||||
biggerOffset.put(initialBytes);
|
||||
while (biggerOffset.hasRemaining()) {
|
||||
biggerOffset.put((byte) remainingBytes);
|
||||
}
|
||||
biggerOffset.clear();
|
||||
return biggerOffset.asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
private short computeNumNonZero(byte theByte)
|
||||
{
|
||||
short retVal = 0;
|
||||
if ((theByte & 0x0f) > 0) {
|
||||
++retVal;
|
||||
}
|
||||
if ((theByte & 0xf0) > 0) {
|
||||
++retVal;
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
//@Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
|
||||
public void testFoldingwithDifferentOffsets() throws Exception
|
||||
{
|
||||
for (int j = 0; j < 10; j++) {
|
||||
HyperLogLogCollector smallVals = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector bigVals = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector all = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
int numThings = 500000;
|
||||
for (int i = 0; i < numThings; i++) {
|
||||
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
|
||||
|
||||
if (i < 1000) {
|
||||
smallVals.add(hashedVal);
|
||||
} else {
|
||||
bigVals.add(hashedVal);
|
||||
}
|
||||
all.add(hashedVal);
|
||||
}
|
||||
|
||||
HyperLogLogCollector folded = HyperLogLogCollector.makeLatestCollector();
|
||||
folded.fold(smallVals);
|
||||
folded.fold(bigVals);
|
||||
final double expected = all.estimateCardinality();
|
||||
Assert.assertEquals(expected, folded.estimateCardinality(), expected * 0.025);
|
||||
Assert.assertEquals(numThings, folded.estimateCardinality(), numThings * 0.05);
|
||||
}
|
||||
}
|
||||
|
||||
//@Test
|
||||
public void testFoldingwithDifferentOffsets2() throws Exception
|
||||
{
|
||||
MessageDigest md = MessageDigest.getInstance("SHA-1");
|
||||
|
||||
for (int j = 0; j < 1; j++) {
|
||||
HyperLogLogCollector evenVals = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector oddVals = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector all = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
int numThings = 500000;
|
||||
for (int i = 0; i < numThings; i++) {
|
||||
md.update(Integer.toString(random.nextInt()).getBytes());
|
||||
byte[] hashedVal = fn.hashBytes(md.digest()).asBytes();
|
||||
|
||||
if (i % 2 == 0) {
|
||||
evenVals.add(hashedVal);
|
||||
} else {
|
||||
oddVals.add(hashedVal);
|
||||
}
|
||||
all.add(hashedVal);
|
||||
}
|
||||
|
||||
HyperLogLogCollector folded = HyperLogLogCollector.makeLatestCollector();
|
||||
folded.fold(evenVals);
|
||||
folded.fold(oddVals);
|
||||
final double expected = all.estimateCardinality();
|
||||
Assert.assertEquals(expected, folded.estimateCardinality(), expected * 0.025);
|
||||
Assert.assertEquals(numThings, folded.estimateCardinality(), numThings * 0.05);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEstimation() throws Exception
|
||||
{
|
||||
Random random = new Random(0l);
|
||||
|
||||
final int[] valsToCheck = {10, 20, 50, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 1000000, 2000000};
|
||||
final double[] expectedVals = {
|
||||
11.029647221949576, 21.108407720752034, 51.64575281885815, 100.42231726408892,
|
||||
981.8579991802412, 1943.1337257462792, 4946.192042635218, 9935.088157579434,
|
||||
20366.1486889433, 49433.56029693898, 100615.26273314281, 980831.624899156000,
|
||||
1982408.2608981386
|
||||
};
|
||||
|
||||
int valsToCheckIndex = 0;
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
for (int i = 0; i < valsToCheck[valsToCheck.length - 1]; ++i) {
|
||||
collector.add(fn.hashLong(random.nextLong()).asBytes());
|
||||
if (i == valsToCheck[valsToCheckIndex]) {
|
||||
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
|
||||
++valsToCheckIndex;
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(expectedVals.length, valsToCheckIndex + 1);
|
||||
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEstimationReadOnlyByteBuffers() throws Exception
|
||||
{
|
||||
Random random = new Random(0l);
|
||||
|
||||
final int[] valsToCheck = {10, 20, 50, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 1000000, 2000000};
|
||||
final double[] expectedVals = {
|
||||
11.029647221949576, 21.108407720752034, 51.64575281885815, 100.42231726408892,
|
||||
981.8579991802412, 1943.1337257462792, 4946.192042635218, 9935.088157579434,
|
||||
20366.1486889433, 49433.56029693898, 100615.26273314281, 980831.624899156000,
|
||||
1982408.2608981386
|
||||
};
|
||||
|
||||
int valsToCheckIndex = 0;
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
|
||||
ByteBuffer.allocateDirect(
|
||||
HyperLogLogCollector.getLatestNumBytesForDenseStorage()
|
||||
)
|
||||
);
|
||||
for (int i = 0; i < valsToCheck[valsToCheck.length - 1]; ++i) {
|
||||
collector.add(fn.hashLong(random.nextLong()).asBytes());
|
||||
if (i == valsToCheck[valsToCheckIndex]) {
|
||||
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
|
||||
++valsToCheckIndex;
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(expectedVals.length, valsToCheckIndex + 1);
|
||||
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEstimationLimitDifferentFromCapacity() throws Exception
|
||||
{
|
||||
Random random = new Random(0l);
|
||||
|
||||
final int[] valsToCheck = {10, 20, 50, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 1000000, 2000000};
|
||||
final double[] expectedVals = {
|
||||
11.029647221949576, 21.108407720752034, 51.64575281885815, 100.42231726408892,
|
||||
981.8579991802412, 1943.1337257462792, 4946.192042635218, 9935.088157579434,
|
||||
20366.1486889433, 49433.56029693898, 100615.26273314281, 980831.624899156000,
|
||||
1982408.2608981386
|
||||
};
|
||||
|
||||
int valsToCheckIndex = 0;
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
|
||||
(ByteBuffer) ByteBuffer.allocate(10000)
|
||||
.position(0)
|
||||
.limit(HyperLogLogCollector.getLatestNumBytesForDenseStorage())
|
||||
);
|
||||
for (int i = 0; i < valsToCheck[valsToCheck.length - 1]; ++i) {
|
||||
collector.add(fn.hashLong(random.nextLong()).asBytes());
|
||||
if (i == valsToCheck[valsToCheckIndex]) {
|
||||
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
|
||||
++valsToCheckIndex;
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(expectedVals.length, valsToCheckIndex + 1);
|
||||
Assert.assertEquals(expectedVals[valsToCheckIndex], collector.estimateCardinality(), 0.0d);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSparseEstimation() throws Exception
|
||||
{
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
collector.add(fn.hashLong(random.nextLong()).asBytes());
|
||||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
collector.estimateCardinality(), collector.estimateByteBuffer(collector.toByteBuffer()), 0.0d
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHighBits() throws Exception
|
||||
{
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
// fill up all the buckets so we reach a registerOffset of 49
|
||||
fillBuckets(collector, (byte) 0, (byte) 49);
|
||||
|
||||
// highest possible bit position is 64
|
||||
collector.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
|
||||
Assert.assertEquals(8.5089685793441677E17, collector.estimateCardinality(), 1000);
|
||||
|
||||
// this might happen once in a million years if you hash a billion values a second
|
||||
fillBuckets(collector, (byte) 0, (byte) 63);
|
||||
collector.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
|
||||
|
||||
Assert.assertEquals(Double.MAX_VALUE, collector.estimateCardinality(), 1000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompare1() throws Exception
|
||||
{
|
||||
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
|
||||
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
|
||||
collector1.add(fn.hashLong(0).asBytes());
|
||||
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
|
||||
Comparator comparator = factory.getComparator();
|
||||
for (int i = 1; i < 100; i = i + 2) {
|
||||
collector1.add(fn.hashLong(i).asBytes());
|
||||
collector2.add(fn.hashLong(i + 1).asBytes());
|
||||
Assert.assertEquals(1, comparator.compare(collector1, collector2));
|
||||
Assert.assertEquals(1, Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompare2() throws Exception
|
||||
{
|
||||
Random rand = new Random(0);
|
||||
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
|
||||
Comparator comparator = factory.getComparator();
|
||||
for (int i = 1; i < 1000; ++i) {
|
||||
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
|
||||
int j = rand.nextInt(50);
|
||||
for (int l = 0; l < j; ++l) {
|
||||
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
|
||||
}
|
||||
|
||||
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
|
||||
int k = j + 1 + rand.nextInt(5);
|
||||
for (int l = 0; l < k; ++l) {
|
||||
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
|
||||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
|
||||
comparator.compare(collector1, collector2)
|
||||
);
|
||||
}
|
||||
|
||||
for (int i = 1; i < 100; ++i) {
|
||||
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
|
||||
int j = rand.nextInt(500);
|
||||
for (int l = 0; l < j; ++l) {
|
||||
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
|
||||
}
|
||||
|
||||
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
|
||||
int k = j + 2 + rand.nextInt(5);
|
||||
for (int l = 0; l < k; ++l) {
|
||||
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
|
||||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
|
||||
comparator.compare(collector1, collector2)
|
||||
);
|
||||
}
|
||||
|
||||
for (int i = 1; i < 10; ++i) {
|
||||
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
|
||||
int j = rand.nextInt(100000);
|
||||
for (int l = 0; l < j; ++l) {
|
||||
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
|
||||
}
|
||||
|
||||
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
|
||||
int k = j + 20000 + rand.nextInt(100000);
|
||||
for (int l = 0; l < k; ++l) {
|
||||
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
|
||||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
|
||||
comparator.compare(collector1, collector2)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static void fillBuckets(HyperLogLogCollector collector, byte startOffset, byte endOffset)
|
||||
{
|
||||
byte offset = startOffset;
|
||||
while (offset <= endOffset) {
|
||||
// fill buckets to shift registerOffset
|
||||
for (short bucket = 0; bucket < 2048; ++bucket) {
|
||||
collector.add(bucket, offset);
|
||||
}
|
||||
offset++;
|
||||
}
|
||||
}
|
||||
|
||||
// Provides a nice printout of error rates as a function of cardinality
|
||||
//@Test
|
||||
public void showErrorRate() throws Exception
|
||||
{
|
||||
HashFunction fn = Hashing.murmur3_128();
|
||||
Random random = new Random();
|
||||
|
||||
double error = 0.0d;
|
||||
int count = 0;
|
||||
|
||||
final int[] valsToCheck = {
|
||||
10, 20, 50, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 1000000, 2000000, 10000000, Integer.MAX_VALUE
|
||||
};
|
||||
|
||||
for (int numThings : valsToCheck) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
for (int i = 0; i < numThings; ++i) {
|
||||
if (i != 0 && i % 100000000 == 0) {
|
||||
++count;
|
||||
error = computeError(error, count, i, startTime, collector);
|
||||
}
|
||||
collector.add(fn.hashLong(random.nextLong()).asBytes());
|
||||
}
|
||||
|
||||
++count;
|
||||
error = computeError(error, count, numThings, startTime, collector);
|
||||
}
|
||||
}
|
||||
|
||||
private double computeError(double error, int count, int numThings, long startTime, HyperLogLogCollector collector)
|
||||
{
|
||||
final double estimatedValue = collector.estimateCardinality();
|
||||
final double errorThisTime = Math.abs((double) numThings - estimatedValue) / numThings;
|
||||
|
||||
error += errorThisTime;
|
||||
|
||||
System.out.printf(
|
||||
"%,d ==? %,f in %,d millis. actual error[%,f%%], avg. error [%,f%%]%n",
|
||||
numThings,
|
||||
estimatedValue,
|
||||
System.currentTimeMillis() - startTime,
|
||||
100 * errorThisTime,
|
||||
(error / count) * 100
|
||||
);
|
||||
return error;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HyperUniqueFinalizingPostAggregatorTest
|
||||
{
|
||||
private final HashFunction fn = Hashing.murmur3_128();
|
||||
|
||||
@Test
|
||||
public void testCompute() throws Exception
|
||||
{
|
||||
Random random = new Random(0l);
|
||||
HyperUniqueFinalizingPostAggregator postAggregator = new HyperUniqueFinalizingPostAggregator(
|
||||
"uniques"
|
||||
);
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
|
||||
collector.add(hashedVal);
|
||||
}
|
||||
|
||||
double cardinality = (Double) postAggregator.compute(ImmutableMap.<String, Object>of("uniques", collector));
|
||||
|
||||
Assert.assertTrue(cardinality == 99.37233005831612);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class HyperUniquesAggregatorFactoryTest
|
||||
{
|
||||
final static HyperUniquesAggregatorFactory aggregatorFactory = new HyperUniquesAggregatorFactory(
|
||||
"hyperUnique",
|
||||
"uniques"
|
||||
);
|
||||
final static String V0_BASE64 = "AAYbEyQwFyQVASMCVFEQQgEQIxIhM4ISAQMhUkICEDFDIBMhMgFQFAFAMjAAEhEREyVAEiUBAhIjISATMCECMiERIRIiVRFRAyIAEgFCQSMEJAITATAAEAMQgCEBEjQiAyUTAyEQASJyAGURAAISAwISATETQhAREBYDIVIlFTASAzJgERIgRCcmUyAwNAMyEJMjIhQXQhEWECABQDETATEREjIRAgEyIiMxMBQiAkBBMDYAMEQQACMzMhIkMTQSkYIRABIBADMBAhIEISAENkEBQDAxETMAIEEwEzQiQSEVQSFBBAQDICIiAVIAMTAQIQYBIRABADMDEzEAQSMkEiAYFBAQI0AmECEyQSARRTIVMhEkMiKAMCUBxUghAkIBI3EmMAQiACEAJDJCAAADOzESEDBCRjMgEUQQETQwEWIhA6MlAiAAZDI1AgEIIDUyFDIHMQEEAwIRBRABBStCZCQhAgJSMQIiQEEURTBmM1MxACIAETGhMgQnBRICNiIREyIUNAEAAkABAwQSEBJBIhIhIRERAiIRACUhEUAVMkQGEVMjECYjACBwEQQSIRIgAAEyExQUFSEAIBJCIDIDYTAgMiNBIUADUiETADMoFEADETMCIwUEQkIAESMSIzIABDERIXEhIiACQgUSEgJiQCAUARIRAREDQiEUAkQgAgQiIEAzIxRCARIgBAAVAzMAECEwE0Qh8gAAASEhEiAiMhUxcRImIVABATYyUBAwIoE1QhRDIiYBIBEBEiQSQyERAAADMAARAEACFYUwQSQBIRIgURITARFSEzEHEBACOTMREBIAMjIgEhU0cxEQIRIhIi1wEgMRUBEgMQIRAnAVASURMHQBAiEyBSAAEBQTAWQ5EQA0IUMSISAUEiASIjIhMhMFJBBSEjEAECEwACASEQFBAjARITEQIgYTEKEAeAAiMkEyARowARFBAicRISIBIxAQAgEBARMCIRQgMSIVIAkjMxIAIEMyADASMgFRIjEyKjEjBBIEQCUAARYBEQMxMCIBACNCACRCMlEzUUAAUDM1MhAjEgAxAAISAVFQECAhQAMBMhEzEgASNxAhFRIxECMRJBQAERAToBgQMhJSRQFAEhAwMiIhMQAwAgQiBQJiIGMQQhEiQxR1MiAjIAIEEiAkARECEzQlMjECIRATBgIhEBQAIQAEATEjBCMwAgMBMhAhIyFBIxQAARI1AAEABCIDFBIRUzMBIgAgEiARQCASMQQDQCFBAQAUJwMUElAyIAIRBSIRITICEAIxMAEUBEYTcBMBEEIxMREwIRIDAGIAEgYxBAEANCAhBAI2UhIiIgIRABIEVRAwNEIQERQgEFMhFCQSIAEhQDMTEQMiAjJyEQ==";
|
||||
|
||||
@Test
|
||||
public void testDeserializeV0() throws Exception
|
||||
{
|
||||
Object v0 = aggregatorFactory.deserialize(V0_BASE64);
|
||||
Assert.assertEquals("deserialized value is HLLCV0", HLLCV0.class, v0.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCombineStartValueV0() throws Exception
|
||||
{
|
||||
Object combined = aggregatorFactory.getAggregatorStartValue();
|
||||
aggregatorFactory.combine(combined, aggregatorFactory.deserialize(V0_BASE64));
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -85,7 +85,8 @@ public class TimeseriesQueryRunnerTest
|
|||
.aggregators(
|
||||
Arrays.asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
QueryRunnerTestHelper.indexDoubleSum
|
||||
QueryRunnerTestHelper.indexDoubleSum,
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
|
@ -128,6 +129,11 @@ public class TimeseriesQueryRunnerTest
|
|||
value.getDoubleMetric("addRowsIndexConstant"),
|
||||
0.0
|
||||
);
|
||||
Assert.assertEquals(
|
||||
value.getDoubleMetric("uniques"),
|
||||
QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0.0d : 9.0d,
|
||||
0.02
|
||||
);
|
||||
|
||||
expectedEarliest = gran.toDateTime(gran.next(expectedEarliest.getMillis()));
|
||||
++count;
|
||||
|
@ -182,7 +188,12 @@ public class TimeseriesQueryRunnerTest
|
|||
.granularity(QueryRunnerTestHelper.dayGran)
|
||||
.filters(QueryRunnerTestHelper.providerDimension, "upfront")
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.aggregators(Arrays.<AggregatorFactory>asList(QueryRunnerTestHelper.rowsCount))
|
||||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
Assert.assertEquals(
|
||||
|
@ -215,6 +226,14 @@ public class TimeseriesQueryRunnerTest
|
|||
QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0L : 2L,
|
||||
value.getLongMetric("rows").longValue()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
result.toString(),
|
||||
QueryRunnerTestHelper.skippedDay.equals(result.getTimestamp()) ? 0.0d : 2.0d,
|
||||
value.getDoubleMetric(
|
||||
"uniques"
|
||||
),
|
||||
0.01
|
||||
);
|
||||
|
||||
expectedEarliest = gran.toDateTime(gran.next(expectedEarliest.getMillis()));
|
||||
}
|
||||
|
@ -233,7 +252,8 @@ public class TimeseriesQueryRunnerTest
|
|||
new LongSumAggregatorFactory(
|
||||
"idx",
|
||||
"index"
|
||||
)
|
||||
),
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
@ -242,13 +262,13 @@ public class TimeseriesQueryRunnerTest
|
|||
new Result<TimeseriesResultValue>(
|
||||
new DateTime("2011-04-01"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6619L)
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6619L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
|
||||
)
|
||||
),
|
||||
new Result<TimeseriesResultValue>(
|
||||
new DateTime("2011-04-02"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -327,7 +347,8 @@ public class TimeseriesQueryRunnerTest
|
|||
new LongSumAggregatorFactory(
|
||||
"idx",
|
||||
"index"
|
||||
)
|
||||
),
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
@ -336,7 +357,7 @@ public class TimeseriesQueryRunnerTest
|
|||
new Result<TimeseriesResultValue>(
|
||||
new DateTime("2011-04-01"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -363,7 +384,8 @@ public class TimeseriesQueryRunnerTest
|
|||
new LongSumAggregatorFactory(
|
||||
"idx",
|
||||
"index"
|
||||
)
|
||||
),
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
@ -372,7 +394,7 @@ public class TimeseriesQueryRunnerTest
|
|||
new Result<TimeseriesResultValue>(
|
||||
new DateTime("2011-04-02"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -457,7 +479,8 @@ public class TimeseriesQueryRunnerTest
|
|||
new LongSumAggregatorFactory(
|
||||
"idx",
|
||||
"index"
|
||||
)
|
||||
),
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
@ -466,7 +489,7 @@ public class TimeseriesQueryRunnerTest
|
|||
new Result<TimeseriesResultValue>(
|
||||
new DateTime("2011-04-01"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -494,7 +517,8 @@ public class TimeseriesQueryRunnerTest
|
|||
new LongSumAggregatorFactory(
|
||||
"idx",
|
||||
"index"
|
||||
)
|
||||
),
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
@ -503,7 +527,7 @@ public class TimeseriesQueryRunnerTest
|
|||
new Result<TimeseriesResultValue>(
|
||||
new DateTime("2011-04-02"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
|
||||
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -561,7 +585,8 @@ public class TimeseriesQueryRunnerTest
|
|||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
QueryRunnerTestHelper.indexLongSum
|
||||
QueryRunnerTestHelper.indexLongSum,
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
|
@ -574,7 +599,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 13L,
|
||||
"index", 6619L,
|
||||
"addRowsIndexConstant", 6633.0
|
||||
"addRowsIndexConstant", 6633.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_9
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -584,7 +610,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 13L,
|
||||
"index", 5827L,
|
||||
"addRowsIndexConstant", 5841.0
|
||||
"addRowsIndexConstant", 5841.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_9
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -608,7 +635,8 @@ public class TimeseriesQueryRunnerTest
|
|||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
QueryRunnerTestHelper.indexLongSum
|
||||
QueryRunnerTestHelper.indexLongSum,
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
|
@ -621,7 +649,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 11L,
|
||||
"index", 3783L,
|
||||
"addRowsIndexConstant", 3795.0
|
||||
"addRowsIndexConstant", 3795.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_9
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -631,7 +660,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 11L,
|
||||
"index", 3313L,
|
||||
"addRowsIndexConstant", 3325.0
|
||||
"addRowsIndexConstant", 3325.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_9
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -655,7 +685,8 @@ public class TimeseriesQueryRunnerTest
|
|||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
QueryRunnerTestHelper.indexLongSum
|
||||
QueryRunnerTestHelper.indexLongSum,
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
|
@ -668,7 +699,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 9L,
|
||||
"index", 1102L,
|
||||
"addRowsIndexConstant", 1112.0
|
||||
"addRowsIndexConstant", 1112.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_9
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -678,7 +710,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 9L,
|
||||
"index", 1120L,
|
||||
"addRowsIndexConstant", 1130.0
|
||||
"addRowsIndexConstant", 1130.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_9
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -702,7 +735,8 @@ public class TimeseriesQueryRunnerTest
|
|||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
QueryRunnerTestHelper.indexLongSum
|
||||
QueryRunnerTestHelper.indexLongSum,
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
|
@ -715,7 +749,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 2L,
|
||||
"index", 2681L,
|
||||
"addRowsIndexConstant", 2684.0
|
||||
"addRowsIndexConstant", 2684.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -725,7 +760,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 2L,
|
||||
"index", 2193L,
|
||||
"addRowsIndexConstant", 2196.0
|
||||
"addRowsIndexConstant", 2196.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -749,7 +785,8 @@ public class TimeseriesQueryRunnerTest
|
|||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
QueryRunnerTestHelper.indexLongSum
|
||||
QueryRunnerTestHelper.indexLongSum,
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
|
@ -762,7 +799,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 2L,
|
||||
"index", 2836L,
|
||||
"addRowsIndexConstant", 2839.0
|
||||
"addRowsIndexConstant", 2839.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -772,7 +810,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 2L,
|
||||
"index", 2514L,
|
||||
"addRowsIndexConstant", 2517.0
|
||||
"addRowsIndexConstant", 2517.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -818,7 +857,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 2L,
|
||||
"index", 254.4554443359375D,
|
||||
"addRowsIndexConstant", 257.4554443359375D
|
||||
"addRowsIndexConstant", 257.4554443359375D,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -828,7 +868,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 2L,
|
||||
"index", 260.4129638671875D,
|
||||
"addRowsIndexConstant", 263.4129638671875D
|
||||
"addRowsIndexConstant", 263.4129638671875D,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -874,7 +915,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 1L,
|
||||
"index", new Float(135.885094).doubleValue(),
|
||||
"addRowsIndexConstant", new Float(137.885094).doubleValue()
|
||||
"addRowsIndexConstant", new Float(137.885094).doubleValue(),
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_1
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -884,7 +926,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 1L,
|
||||
"index", new Float(147.425935).doubleValue(),
|
||||
"addRowsIndexConstant", new Float(149.425935).doubleValue()
|
||||
"addRowsIndexConstant", new Float(149.425935).doubleValue(),
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_1
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -930,7 +973,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 1L,
|
||||
"index", new Float(118.570340).doubleValue(),
|
||||
"addRowsIndexConstant", new Float(120.570340).doubleValue()
|
||||
"addRowsIndexConstant", new Float(120.570340).doubleValue(),
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_1
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -940,7 +984,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 1L,
|
||||
"index", new Float(112.987027).doubleValue(),
|
||||
"addRowsIndexConstant", new Float(114.987027).doubleValue()
|
||||
"addRowsIndexConstant", new Float(114.987027).doubleValue(),
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_1
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -970,7 +1015,8 @@ public class TimeseriesQueryRunnerTest
|
|||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
QueryRunnerTestHelper.indexLongSum
|
||||
QueryRunnerTestHelper.indexLongSum,
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||
|
@ -983,7 +1029,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 13L,
|
||||
"index", 6619L,
|
||||
"addRowsIndexConstant", 6633.0
|
||||
"addRowsIndexConstant", 6633.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_9
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -993,7 +1040,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 13L,
|
||||
"index", 5827L,
|
||||
"addRowsIndexConstant", 5841.0
|
||||
"addRowsIndexConstant", 5841.0,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_9
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -1043,7 +1091,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 2L,
|
||||
"index", 254.4554443359375D,
|
||||
"addRowsIndexConstant", 257.4554443359375D
|
||||
"addRowsIndexConstant", 257.4554443359375D,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -1053,7 +1102,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 2L,
|
||||
"index", 260.4129638671875D,
|
||||
"addRowsIndexConstant", 263.4129638671875D
|
||||
"addRowsIndexConstant", 263.4129638671875D,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -1085,7 +1135,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 0L,
|
||||
"index", 0.0,
|
||||
"addRowsIndexConstant", 1.0
|
||||
"addRowsIndexConstant", 1.0,
|
||||
"uniques", 0.0
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -1095,7 +1146,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 0L,
|
||||
"index", 0.0,
|
||||
"addRowsIndexConstant", 1.0
|
||||
"addRowsIndexConstant", 1.0,
|
||||
"uniques", 0.0
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -1127,7 +1179,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 0L,
|
||||
"index", 0.0,
|
||||
"addRowsIndexConstant", 1.0
|
||||
"addRowsIndexConstant", 1.0,
|
||||
"uniques", 0.0
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -1137,7 +1190,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 0L,
|
||||
"index", 0.0,
|
||||
"addRowsIndexConstant", 1.0
|
||||
"addRowsIndexConstant", 1.0,
|
||||
"uniques", 0.0
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -1183,7 +1237,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 0L,
|
||||
"index", 0.0,
|
||||
"addRowsIndexConstant", 1.0
|
||||
"addRowsIndexConstant", 1.0,
|
||||
"uniques", 0.0
|
||||
)
|
||||
)
|
||||
),
|
||||
|
@ -1193,7 +1248,8 @@ public class TimeseriesQueryRunnerTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"rows", 0L,
|
||||
"index", 0.0,
|
||||
"addRowsIndexConstant", 1.0
|
||||
"addRowsIndexConstant", 1.0,
|
||||
"uniques", 0.0
|
||||
)
|
||||
)
|
||||
)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -21,6 +21,7 @@ package io.druid.segment;
|
|||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.CharStreams;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.google.common.io.LineProcessor;
|
||||
|
@ -31,7 +32,10 @@ import io.druid.data.input.impl.TimestampSpec;
|
|||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -52,14 +56,29 @@ public class TestIndex
|
|||
private static QueryableIndex mmappedIndex = null;
|
||||
private static QueryableIndex mergedRealtime = null;
|
||||
|
||||
public static final String[] COLUMNS = new String[]{"ts", "provider", "quALIty", "plAcEmEnT", "pLacementish", "iNdEx"};
|
||||
public static final String[] COLUMNS = new String[]{
|
||||
"ts",
|
||||
"provider",
|
||||
"quALIty",
|
||||
"plAcEmEnT",
|
||||
"pLacementish",
|
||||
"iNdEx",
|
||||
"qualiTy_Uniques"
|
||||
};
|
||||
public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"};
|
||||
public static final String[] METRICS = new String[]{"iNdEx"};
|
||||
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-04-16T00:00:00.000Z");
|
||||
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0])
|
||||
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
|
||||
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
|
||||
};
|
||||
|
||||
static {
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
}
|
||||
}
|
||||
|
||||
public static IncrementalIndex getIncrementalTestIndex()
|
||||
{
|
||||
synchronized (log) {
|
||||
|
|
Loading…
Reference in New Issue