Merge pull request #441 from metamx/hll-optimization

HyperLogLog optimizations and benchmark
This commit is contained in:
fjy 2014-03-27 16:36:30 -06:00
commit de95fbc089
8 changed files with 277 additions and 66 deletions

View File

@ -91,6 +91,10 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -37,12 +37,12 @@ public class HLLCV0 extends HyperLogLogCollector
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{0, 0, 0}).asReadOnlyBuffer();
public HLLCV0()
protected HLLCV0()
{
super(defaultStorageBuffer);
}
public HLLCV0(ByteBuffer buffer)
protected HLLCV0(ByteBuffer buffer)
{
super(buffer);
}
@ -149,4 +149,4 @@ public class HLLCV0 extends HyperLogLogCollector
{
return buffer.position() + HEADER_NUM_BYTES;
}
}
}

View File

@ -44,12 +44,12 @@ public class HLLCV1 extends HyperLogLogCollector
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{VERSION, 0, 0, 0, 0, 0, 0})
.asReadOnlyBuffer();
public HLLCV1()
protected HLLCV1()
{
super(defaultStorageBuffer);
}
public HLLCV1(ByteBuffer buffer)
protected HLLCV1(ByteBuffer buffer)
{
super(buffer);
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.primitives.UnsignedBytes;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import java.nio.ByteBuffer;
@ -57,7 +56,6 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
public static final double HIGH_CORRECTION_THRESHOLD = TWO_TO_THE_SIXTY_FOUR / 30.0d;
public static final double CORRECTION_PARAMETER = ALPHA * NUM_BUCKETS * NUM_BUCKETS;
private static final Logger log = new Logger(HyperLogLogCollector.class);
private static final int bucketMask = 0x7ff;
private static final int minBytesRequired = 10;
private static final int bitsPerBucket = 4;
@ -202,9 +200,9 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return buffer.remaining() != NUM_BYTES_FOR_BUCKETS;
}
private volatile ByteBuffer storageBuffer;
private volatile int initPosition;
private volatile Double estimatedCardinality;
private ByteBuffer storageBuffer;
private int initPosition;
private Double estimatedCardinality;
public HyperLogLogCollector(ByteBuffer byteBuffer)
{
@ -332,7 +330,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
if (getRegisterOffset() < other.getRegisterOffset()) {
// "Swap" the buffers so that we are folding into the one with the higher offset
ByteBuffer newStorage = ByteBuffer.allocate(other.storageBuffer.remaining());
ByteBuffer newStorage = ByteBuffer.allocateDirect(other.storageBuffer.remaining());
newStorage.put(other.storageBuffer.asReadOnlyBuffer());
newStorage.clear();
@ -342,8 +340,8 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
initPosition = 0;
}
ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
byte otherOffset = other.getRegisterOffset();
final ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
final byte otherOffset = other.getRegisterOffset();
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
convertToDenseStorage();
@ -352,56 +350,50 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
byte myOffset = getRegisterOffset();
short numNonZero = getNumNonZeroRegisters();
int offsetDiff = myOffset - otherOffset;
final int offsetDiff = myOffset - otherOffset;
if (offsetDiff < 0) {
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
}
byte otherOverflowValue = other.getMaxOverflowValue();
short otherOverflowRegister = other.getMaxOverflowRegister();
add(otherOverflowRegister, otherOverflowValue);
add(other.getMaxOverflowRegister(), other.getMaxOverflowValue());
int myPayloadStart = getPayloadBytePosition();
final int myPayloadStart = getPayloadBytePosition();
otherBuffer.position(other.getPayloadBytePosition());
if (isSparse(otherBuffer)) {
while (otherBuffer.hasRemaining()) {
short position = otherBuffer.getShort();
int payloadStartPosition = position - other.getNumHeaderBytes();
while(otherBuffer.hasRemaining()) {
final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes();
numNonZero += mergeAndStoreByteRegister(
storageBuffer,
myPayloadStart + payloadStartPosition,
offsetDiff,
otherBuffer.get()
);
if (numNonZero == NUM_BUCKETS) {
myOffset += 1;
numNonZero = decrementBuckets();
setRegisterOffset(myOffset);
setNumNonZeroRegisters(numNonZero);
offsetDiff = myOffset - otherOffset;
}
}
if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets();
setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero);
}
} else { // dense
int position = getPayloadBytePosition();
while (otherBuffer.hasRemaining()) {
numNonZero += mergeAndStoreByteRegister(
storageBuffer,
position,
offsetDiff,
otherBuffer.get()
);
if (numNonZero == NUM_BUCKETS) {
myOffset += 1;
numNonZero = decrementBuckets();
setRegisterOffset(myOffset);
setNumNonZeroRegisters(numNonZero);
offsetDiff = myOffset - otherOffset;
}
position++;
}
if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets();
setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero);
}
}
setRegisterOffset(myOffset);
// no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented
setNumNonZeroRegisters(numNonZero);
return this;
@ -531,15 +523,15 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
private short decrementBuckets()
{
final int startPosition = getPayloadBytePosition();
short count = 0;
int startPosition = getPayloadBytePosition();
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
byte val = (byte) (storageBuffer.get(i) - 0x11);
final byte val = (byte) (storageBuffer.get(i) - 0x11);
if ((val & 0xf0) != 0) {
count++;
++count;
}
if ((val & 0x0f) != 0) {
count++;
++count;
}
storageBuffer.put(i, val);
}
@ -548,7 +540,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
private void convertToMutableByteBuffer()
{
ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(storageBuffer.remaining());
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
tmpBuffer.position(0);
storageBuffer = tmpBuffer;
@ -557,7 +549,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
private void convertToDenseStorage()
{
ByteBuffer tmpBuffer = ByteBuffer.wrap(new byte[getNumBytesForDenseStorage()]);
ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(getNumBytesForDenseStorage());
// put header
setVersion(tmpBuffer);
setRegisterOffset(tmpBuffer, getRegisterOffset());
@ -614,39 +606,39 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
*
* @return
*/
private int mergeAndStoreByteRegister(
int position,
int offsetDiff,
byte byteToAdd
private static int mergeAndStoreByteRegister(
final ByteBuffer storageBuffer,
final int position,
final int offsetDiff,
final byte byteToAdd
)
{
if (byteToAdd == 0) {
return 0;
}
byte currVal = storageBuffer.get(position);
final byte currVal = storageBuffer.get(position);
int upperNibble = currVal & 0xf0;
int lowerNibble = currVal & 0x0f;
final int upperNibble = currVal & 0xf0;
final int lowerNibble = currVal & 0x0f;
// subtract the differences so that the nibbles align
int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << bitsPerBucket);
int otherLower = (byteToAdd & 0x0f) - offsetDiff;
final int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << bitsPerBucket);
final int otherLower = (byteToAdd & 0x0f) - offsetDiff;
final int newUpper = Math.max(upperNibble, otherUpper);
final int newLower = Math.max(lowerNibble, otherLower);
storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff));
int numNoLongerZero = 0;
if (upperNibble == 0 && newUpper > 0) {
++numNoLongerZero;
}
if (lowerNibble == 0 && newLower > 0) {
++numNoLongerZero;
}
storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff));
return numNoLongerZero;
}

View File

@ -66,10 +66,11 @@ public class HyperUniquesBufferAggregator implements BufferAggregator
@Override
public Object get(ByteBuffer buf, int position)
{
ByteBuffer dataCopyBuffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
ByteBuffer dataCopyBuffer = ByteBuffer.allocateDirect(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
mutationBuffer.get(dataCopyBuffer.array());
dataCopyBuffer.put(mutationBuffer);
dataCopyBuffer.rewind();
return HyperLogLogCollector.makeCollector(dataCopyBuffer);
}

View File

@ -121,9 +121,7 @@ public class HyperUniquesSerde extends ComplexMetricSerde
public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
{
buffer.limit(buffer.position() + numBytes);
int remaining = buffer.remaining();
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
return HyperLogLogCollector.makeCollector(buffer);
}
@Override

View File

@ -0,0 +1,203 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.google.caliper.Param;
import com.google.caliper.Runner;
import com.google.caliper.SimpleBenchmark;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
public class HyperLogLogCollectorBenchmark extends SimpleBenchmark
{
private final HashFunction fn = Hashing.murmur3_128();
private final List<HyperLogLogCollector> collectors = Lists.newLinkedList();
@Param({"true"}) boolean targetIsDirect;
@Param({"default", "random", "0"}) String alignment;
boolean alignSource;
boolean alignTarget;
int CACHE_LINE = 64;
ByteBuffer chunk;
final int count = 100_000;
int[] positions = new int[count];
int[] sizes = new int[count];
@Override
protected void setUp() throws Exception
{
boolean random = false;
Random rand = new Random(0);
int defaultOffset = 0;
switch(alignment) {
case "default":
alignSource = false;
alignTarget = false;
break;
case "random":
random = true;
break;
default:
defaultOffset = Integer.parseInt(alignment);
}
int val = 0;
chunk = ByteBuffers.allocateAlignedByteBuffer(
(HyperLogLogCollector.getLatestNumBytesForDenseStorage() + CACHE_LINE
+ CACHE_LINE) * count, CACHE_LINE
);
int pos = 0;
for(int i = 0; i < count; ++i) {
HyperLogLogCollector c = HyperLogLogCollector.makeLatestCollector();
for(int k = 0; k < 40; ++k) c.add(fn.hashInt(++val).asBytes());
final ByteBuffer sparseHeapCopy = c.toByteBuffer();
int size = sparseHeapCopy.remaining();
final ByteBuffer buf;
final int offset = random ? (int)(rand.nextDouble() * 64) : defaultOffset;
if(alignSource && (pos % CACHE_LINE) != offset) {
pos += (pos % CACHE_LINE) < offset ? offset - (pos % CACHE_LINE) : (CACHE_LINE + offset - pos % CACHE_LINE);
}
positions[i] = pos;
sizes[i] = size;
chunk.limit(pos + size);
chunk.position(pos);
buf = chunk.duplicate();
buf.mark();
pos += size;
buf.put(sparseHeapCopy);
buf.reset();
collectors.add(HyperLogLogCollector.makeCollector(buf));
}
}
private ByteBuffer allocateEmptyHLLBuffer(boolean direct, boolean aligned, int offset)
{
final int size = HyperLogLogCollector.getLatestNumBytesForDenseStorage();
final byte[] EMPTY_BYTES = HyperLogLogCollector.makeEmptyVersionedByteArray();
final ByteBuffer buf;
if(direct) {
if(aligned) {
buf = ByteBuffers.allocateAlignedByteBuffer(size + offset, CACHE_LINE);
buf.position(offset);
buf.mark();
buf.limit(size + offset);
} else {
buf = ByteBuffer.allocateDirect(size);
buf.mark();
buf.limit(size);
}
buf.put(EMPTY_BYTES);
buf.reset();
}
else {
buf = ByteBuffer.allocate(size);
buf.limit(size);
buf.put(EMPTY_BYTES);
buf.rewind();
}
return buf;
}
public double timeFold(int reps) throws Exception
{
final ByteBuffer buf = allocateEmptyHLLBuffer(targetIsDirect, alignTarget, 0);
for (int k = 0; k < reps; ++k) {
for(int i = 0; i < count; ++i) {
final int pos = positions[i];
final int size = sizes[i];
HyperLogLogCollector.makeCollector(
(ByteBuffer) buf.duplicate().position(0).limit(
HyperLogLogCollector.getLatestNumBytesForDenseStorage()
)
).fold(
HyperLogLogCollector.makeCollector(
(ByteBuffer) chunk.duplicate().limit(pos + size).position(pos)
)
);
}
}
return HyperLogLogCollector.makeCollector(buf.duplicate()).estimateCardinality();
}
public static void main(String[] args) throws Exception {
Runner.main(HyperLogLogCollectorBenchmark.class, args);
}
}
class ByteBuffers {
private static final Unsafe UNSAFE;
private static final long ADDRESS_OFFSET;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
ADDRESS_OFFSET = UNSAFE.objectFieldOffset(Buffer.class.getDeclaredField("address"));
} catch (Exception e) {
throw new RuntimeException("Cannot access Unsafe methods", e);
}
}
public static long getAddress(ByteBuffer buf) {
return UNSAFE.getLong(buf, ADDRESS_OFFSET);
}
public static ByteBuffer allocateAlignedByteBuffer(int capacity, int align) {
Preconditions.checkArgument(Long.bitCount(align) == 1, "Alignment must be a power of 2");
final ByteBuffer buf = ByteBuffer.allocateDirect(capacity + align);
long address = getAddress(buf);
if ((address & (align - 1)) == 0) {
buf.limit(capacity);
} else {
int offset = (int) (align - (address & (align - 1)));
buf.position(offset);
buf.limit(offset + capacity);
}
return buf.slice();
}
}

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.hyperloglog;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.nio.ByteBuffer;
@ -70,17 +71,29 @@ public class HyperLogLogCollectorTest
}
}
// @Test
/**
* This is a very long running test, disabled by default.
* It is meant to catch issues when combining a large numer of HLL objects.
*
* It compares adding all the values to one HLL vs.
* splitting up values into HLLs of 100 values each, and folding those HLLs into a single main HLL.
*
* When reaching very large cardinalities (>> 50,000,000), offsets are mismatched between the main HLL and the ones
* with 100 values, requiring a floating max as described in
* http://druid.io/blog/2014/02/18/hyperloglog-optimizations-for-real-world-systems.html
*/
@Ignore @Test
public void testHighCardinalityRollingFold() throws Exception
{
final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector();
final HyperLogLogCollector simple = HyperLogLogCollector.makeLatestCollector();
int count;
MessageDigest md = MessageDigest.getInstance("SHA-1");
HyperLogLogCollector tmp = HyperLogLogCollector.makeLatestCollector();
for (count = 0; count < 5000000; ++count) {
int count;
for (count = 0; count < 100_000_000; ++count) {
md.update(Integer.toString(count).getBytes());
byte[] hashed = fn.hashBytes(md.digest()).asBytes();
@ -110,14 +123,14 @@ public class HyperLogLogCollectorTest
Assert.assertEquals(n, rolling.estimateCardinality(), n * 0.05);
}
//@Test
@Ignore @Test
public void testHighCardinalityRollingFold2() throws Exception
{
final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector();
int count;
long start = System.currentTimeMillis();
for (count = 0; count < 5000000; ++count) {
for (count = 0; count < 50_000_000; ++count) {
HyperLogLogCollector theCollector = HyperLogLogCollector.makeLatestCollector();
theCollector.add(fn.hashLong(count).asBytes());
rolling.fold(theCollector);