mirror of https://github.com/apache/druid.git
minor hll optimizations
This commit is contained in:
parent
5eaadfffa9
commit
46a5409207
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonValue;
|
||||||
import com.google.common.primitives.UnsignedBytes;
|
import com.google.common.primitives.UnsignedBytes;
|
||||||
import com.metamx.common.IAE;
|
import com.metamx.common.IAE;
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.logger.Logger;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
@ -57,7 +56,6 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
public static final double HIGH_CORRECTION_THRESHOLD = TWO_TO_THE_SIXTY_FOUR / 30.0d;
|
public static final double HIGH_CORRECTION_THRESHOLD = TWO_TO_THE_SIXTY_FOUR / 30.0d;
|
||||||
public static final double CORRECTION_PARAMETER = ALPHA * NUM_BUCKETS * NUM_BUCKETS;
|
public static final double CORRECTION_PARAMETER = ALPHA * NUM_BUCKETS * NUM_BUCKETS;
|
||||||
|
|
||||||
private static final Logger log = new Logger(HyperLogLogCollector.class);
|
|
||||||
private static final int bucketMask = 0x7ff;
|
private static final int bucketMask = 0x7ff;
|
||||||
private static final int minBytesRequired = 10;
|
private static final int minBytesRequired = 10;
|
||||||
private static final int bitsPerBucket = 4;
|
private static final int bitsPerBucket = 4;
|
||||||
|
@ -332,7 +330,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
|
|
||||||
if (getRegisterOffset() < other.getRegisterOffset()) {
|
if (getRegisterOffset() < other.getRegisterOffset()) {
|
||||||
// "Swap" the buffers so that we are folding into the one with the higher offset
|
// "Swap" the buffers so that we are folding into the one with the higher offset
|
||||||
ByteBuffer newStorage = ByteBuffer.allocate(other.storageBuffer.remaining());
|
ByteBuffer newStorage = ByteBuffer.allocateDirect(other.storageBuffer.remaining());
|
||||||
newStorage.put(other.storageBuffer.asReadOnlyBuffer());
|
newStorage.put(other.storageBuffer.asReadOnlyBuffer());
|
||||||
newStorage.clear();
|
newStorage.clear();
|
||||||
|
|
||||||
|
@ -342,8 +340,8 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
initPosition = 0;
|
initPosition = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
|
final ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
|
||||||
byte otherOffset = other.getRegisterOffset();
|
final byte otherOffset = other.getRegisterOffset();
|
||||||
|
|
||||||
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
|
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
|
||||||
convertToDenseStorage();
|
convertToDenseStorage();
|
||||||
|
@ -357,25 +355,22 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
|
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
|
||||||
}
|
}
|
||||||
|
|
||||||
byte otherOverflowValue = other.getMaxOverflowValue();
|
add(other.getMaxOverflowRegister(), other.getMaxOverflowValue());
|
||||||
short otherOverflowRegister = other.getMaxOverflowRegister();
|
|
||||||
add(otherOverflowRegister, otherOverflowValue);
|
|
||||||
|
|
||||||
int myPayloadStart = getPayloadBytePosition();
|
final int myPayloadStart = getPayloadBytePosition();
|
||||||
otherBuffer.position(other.getPayloadBytePosition());
|
otherBuffer.position(other.getPayloadBytePosition());
|
||||||
|
|
||||||
if (isSparse(otherBuffer)) {
|
if (isSparse(otherBuffer)) {
|
||||||
while (otherBuffer.hasRemaining()) {
|
while(otherBuffer.hasRemaining()) {
|
||||||
short position = otherBuffer.getShort();
|
final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes();
|
||||||
int payloadStartPosition = position - other.getNumHeaderBytes();
|
|
||||||
numNonZero += mergeAndStoreByteRegister(
|
numNonZero += mergeAndStoreByteRegister(
|
||||||
myPayloadStart + payloadStartPosition,
|
myPayloadStart + payloadStartPosition,
|
||||||
offsetDiff,
|
offsetDiff,
|
||||||
otherBuffer.get()
|
otherBuffer.get()
|
||||||
);
|
);
|
||||||
if (numNonZero == NUM_BUCKETS) {
|
if (numNonZero == NUM_BUCKETS) {
|
||||||
myOffset += 1;
|
|
||||||
numNonZero = decrementBuckets();
|
numNonZero = decrementBuckets();
|
||||||
setRegisterOffset(myOffset);
|
setRegisterOffset(++myOffset);
|
||||||
setNumNonZeroRegisters(numNonZero);
|
setNumNonZeroRegisters(numNonZero);
|
||||||
|
|
||||||
offsetDiff = myOffset - otherOffset;
|
offsetDiff = myOffset - otherOffset;
|
||||||
|
@ -390,9 +385,8 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
otherBuffer.get()
|
otherBuffer.get()
|
||||||
);
|
);
|
||||||
if (numNonZero == NUM_BUCKETS) {
|
if (numNonZero == NUM_BUCKETS) {
|
||||||
myOffset += 1;
|
|
||||||
numNonZero = decrementBuckets();
|
numNonZero = decrementBuckets();
|
||||||
setRegisterOffset(myOffset);
|
setRegisterOffset(++myOffset);
|
||||||
setNumNonZeroRegisters(numNonZero);
|
setNumNonZeroRegisters(numNonZero);
|
||||||
|
|
||||||
offsetDiff = myOffset - otherOffset;
|
offsetDiff = myOffset - otherOffset;
|
||||||
|
@ -401,7 +395,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
setRegisterOffset(myOffset);
|
// no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented
|
||||||
setNumNonZeroRegisters(numNonZero);
|
setNumNonZeroRegisters(numNonZero);
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
|
@ -531,15 +525,15 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
|
|
||||||
private short decrementBuckets()
|
private short decrementBuckets()
|
||||||
{
|
{
|
||||||
short count = 0;
|
|
||||||
int startPosition = getPayloadBytePosition();
|
int startPosition = getPayloadBytePosition();
|
||||||
|
short count = 0;
|
||||||
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
|
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
|
||||||
byte val = (byte) (storageBuffer.get(i) - 0x11);
|
final byte val = (byte) (storageBuffer.get(i) - 0x11);
|
||||||
if ((val & 0xf0) != 0) {
|
if ((val & 0xf0) != 0) {
|
||||||
count++;
|
++count;
|
||||||
}
|
}
|
||||||
if ((val & 0x0f) != 0) {
|
if ((val & 0x0f) != 0) {
|
||||||
count++;
|
++count;
|
||||||
}
|
}
|
||||||
storageBuffer.put(i, val);
|
storageBuffer.put(i, val);
|
||||||
}
|
}
|
||||||
|
@ -548,7 +542,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
|
|
||||||
private void convertToMutableByteBuffer()
|
private void convertToMutableByteBuffer()
|
||||||
{
|
{
|
||||||
ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
|
ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(storageBuffer.remaining());
|
||||||
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
|
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
|
||||||
tmpBuffer.position(0);
|
tmpBuffer.position(0);
|
||||||
storageBuffer = tmpBuffer;
|
storageBuffer = tmpBuffer;
|
||||||
|
@ -557,7 +551,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
|
|
||||||
private void convertToDenseStorage()
|
private void convertToDenseStorage()
|
||||||
{
|
{
|
||||||
ByteBuffer tmpBuffer = ByteBuffer.wrap(new byte[getNumBytesForDenseStorage()]);
|
ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(getNumBytesForDenseStorage());
|
||||||
// put header
|
// put header
|
||||||
setVersion(tmpBuffer);
|
setVersion(tmpBuffer);
|
||||||
setRegisterOffset(tmpBuffer, getRegisterOffset());
|
setRegisterOffset(tmpBuffer, getRegisterOffset());
|
||||||
|
@ -615,38 +609,37 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
private int mergeAndStoreByteRegister(
|
private int mergeAndStoreByteRegister(
|
||||||
int position,
|
final int position,
|
||||||
int offsetDiff,
|
final int offsetDiff,
|
||||||
byte byteToAdd
|
final byte byteToAdd
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
if (byteToAdd == 0) {
|
if (byteToAdd == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
byte currVal = storageBuffer.get(position);
|
final byte currVal = storageBuffer.get(position);
|
||||||
|
|
||||||
int upperNibble = currVal & 0xf0;
|
final int upperNibble = currVal & 0xf0;
|
||||||
int lowerNibble = currVal & 0x0f;
|
final int lowerNibble = currVal & 0x0f;
|
||||||
|
|
||||||
// subtract the differences so that the nibbles align
|
// subtract the differences so that the nibbles align
|
||||||
int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << bitsPerBucket);
|
final int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << bitsPerBucket);
|
||||||
int otherLower = (byteToAdd & 0x0f) - offsetDiff;
|
final int otherLower = (byteToAdd & 0x0f) - offsetDiff;
|
||||||
|
|
||||||
final int newUpper = Math.max(upperNibble, otherUpper);
|
final int newUpper = Math.max(upperNibble, otherUpper);
|
||||||
final int newLower = Math.max(lowerNibble, otherLower);
|
final int newLower = Math.max(lowerNibble, otherLower);
|
||||||
|
|
||||||
|
storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff));
|
||||||
|
|
||||||
int numNoLongerZero = 0;
|
int numNoLongerZero = 0;
|
||||||
if (upperNibble == 0 && newUpper > 0) {
|
if (upperNibble == 0 && newUpper > 0) {
|
||||||
++numNoLongerZero;
|
++numNoLongerZero;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lowerNibble == 0 && newLower > 0) {
|
if (lowerNibble == 0 && newLower > 0) {
|
||||||
++numNoLongerZero;
|
++numNoLongerZero;
|
||||||
}
|
}
|
||||||
|
|
||||||
storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff));
|
|
||||||
|
|
||||||
return numNoLongerZero;
|
return numNoLongerZero;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue