mirror of https://github.com/apache/druid.git
Densify swapped hll buffer (#6865)
* Densify swapped hll buffer * Make test loop limit pre-increment * Reformat * Fix test comments
This commit is contained in:
parent
e48a9c138e
commit
3ed250787d
|
@ -387,6 +387,13 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
|||
|
||||
storageBuffer.duplicate().put(other.storageBuffer.asReadOnlyBuffer());
|
||||
|
||||
if (other.storageBuffer.remaining() != other.getNumBytesForDenseStorage()) {
|
||||
// The other buffer was sparse, densify it
|
||||
final int newLImit = storageBuffer.position() + other.storageBuffer.remaining();
|
||||
storageBuffer.limit(newLImit);
|
||||
convertToDenseStorage();
|
||||
}
|
||||
|
||||
other = HyperLogLogCollector.makeCollector(tmpBuffer);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.hll;
|
|||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hasher;
|
||||
import com.google.common.hash.Hashing;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
@ -30,14 +31,17 @@ import org.junit.Ignore;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class HyperLogLogCollectorTest
|
||||
{
|
||||
|
@ -45,6 +49,18 @@ public class HyperLogLogCollectorTest
|
|||
|
||||
private final HashFunction fn = Hashing.murmur3_128();
|
||||
|
||||
private static void fillBuckets(HyperLogLogCollector collector, byte startOffset, byte endOffset)
|
||||
{
|
||||
byte offset = startOffset;
|
||||
while (offset <= endOffset) {
|
||||
// fill buckets to shift registerOffset
|
||||
for (short bucket = 0; bucket < 2048; ++bucket) {
|
||||
collector.add(bucket, offset);
|
||||
}
|
||||
offset++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFolding()
|
||||
{
|
||||
|
@ -78,7 +94,6 @@ public class HyperLogLogCollectorTest
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This is a very long running test, disabled by default.
|
||||
* It is meant to catch issues when combining a large numer of HLL objects.
|
||||
|
@ -502,7 +517,8 @@ public class HyperLogLogCollectorTest
|
|||
return retVal;
|
||||
}
|
||||
|
||||
@Ignore @Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
|
||||
@Ignore
|
||||
@Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
|
||||
public void testFoldingwithDifferentOffsets()
|
||||
{
|
||||
// final Random random = new Random(37); // this seed will cause this test to fail because of slightly larger errors
|
||||
|
@ -533,7 +549,8 @@ public class HyperLogLogCollectorTest
|
|||
}
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
@Ignore
|
||||
@Test
|
||||
public void testFoldingwithDifferentOffsets2() throws Exception
|
||||
{
|
||||
final Random random = new Random(0);
|
||||
|
@ -707,6 +724,81 @@ public class HyperLogLogCollectorTest
|
|||
Assert.assertEquals(0, collector.getNumNonZeroRegisters());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegisterSwapWithSparse()
|
||||
{
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
// Skip the first bucket
|
||||
for (int i = 1; i < HyperLogLogCollector.NUM_BUCKETS; i++) {
|
||||
collector.add((short) i, (byte) 1);
|
||||
Assert.assertEquals(i, collector.getNumNonZeroRegisters());
|
||||
Assert.assertEquals(0, collector.getRegisterOffset());
|
||||
}
|
||||
Assert.assertEquals(
|
||||
15615.219683654448D,
|
||||
HyperLogLogCollector.makeCollector(collector.toByteBuffer().asReadOnlyBuffer())
|
||||
.estimateCardinality(),
|
||||
1e-5D
|
||||
);
|
||||
|
||||
final byte[] hash = new byte[10];
|
||||
hash[0] = 1; // Bucket 0, 1 offset of 0
|
||||
collector.add(hash);
|
||||
Assert.assertEquals(0, collector.getNumNonZeroRegisters());
|
||||
Assert.assertEquals(1, collector.getRegisterOffset());
|
||||
|
||||
// We have a REALLY bad distribution, Sketch as 0 is fine.
|
||||
Assert.assertEquals(
|
||||
0.0D,
|
||||
HyperLogLogCollector.makeCollector(collector.toByteBuffer().asReadOnlyBuffer())
|
||||
.estimateCardinality(),
|
||||
1e-5D
|
||||
);
|
||||
final ByteBuffer buffer = collector.toByteBuffer();
|
||||
Assert.assertEquals(collector.getNumHeaderBytes(), buffer.remaining());
|
||||
|
||||
final HyperLogLogCollector denseCollector = HyperLogLogCollector.makeLatestCollector();
|
||||
for (int i = 0; i < HyperLogLogCollector.NUM_BUCKETS - 1; i++) {
|
||||
denseCollector.add((short) i, (byte) 1);
|
||||
}
|
||||
|
||||
Assert.assertEquals(HyperLogLogCollector.NUM_BUCKETS - 1, denseCollector.getNumNonZeroRegisters());
|
||||
final HyperLogLogCollector folded = denseCollector.fold(HyperLogLogCollector.makeCollector(buffer));
|
||||
Assert.assertNotNull(folded.toByteBuffer());
|
||||
Assert.assertEquals(folded.getStorageBuffer().remaining(), denseCollector.getNumBytesForDenseStorage());
|
||||
}
|
||||
|
||||
// Example of a terrible sampling filter. Don't use this method
|
||||
@Test
|
||||
public void testCanFillUpOnMod()
|
||||
{
|
||||
final HashFunction fn = Hashing.murmur3_128();
|
||||
final HyperLogLogCollector hyperLogLogCollector = HyperLogLogCollector.makeLatestCollector();
|
||||
final byte[] b = new byte[10];
|
||||
b[0] = 1;
|
||||
hyperLogLogCollector.add(b);
|
||||
final Random random = new Random(347893248701078L);
|
||||
long loops = 0;
|
||||
// Do a 1% "sample" where the mod of the hash is 43
|
||||
final Predicate<Integer> pass = i -> {
|
||||
// ByteOrder.nativeOrder() on lots of systems is ByteOrder.LITTLE_ENDIAN
|
||||
final ByteBuffer bb = ByteBuffer.wrap(fn.hashInt(i).asBytes()).order(ByteOrder.LITTLE_ENDIAN);
|
||||
return (bb.getInt() % 100) == 43;
|
||||
};
|
||||
final long loopLimit = 1_000_000_000L;
|
||||
do {
|
||||
final int rnd = random.nextInt();
|
||||
if (!pass.test(rnd)) {
|
||||
continue;
|
||||
}
|
||||
final Hasher hasher = fn.newHasher();
|
||||
hasher.putInt(rnd);
|
||||
hyperLogLogCollector.add(hasher.hash().asBytes());
|
||||
} while (hyperLogLogCollector.getNumNonZeroRegisters() > 0 && ++loops < loopLimit);
|
||||
Assert.assertNotEquals(loopLimit, loops);
|
||||
Assert.assertEquals(hyperLogLogCollector.getNumHeaderBytes(), hyperLogLogCollector.toByteBuffer().remaining());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeMaxOverflow()
|
||||
{
|
||||
|
@ -736,19 +828,6 @@ public class HyperLogLogCollectorTest
|
|||
Assert.assertEquals(67, collector.getMaxOverflowValue());
|
||||
}
|
||||
|
||||
|
||||
private static void fillBuckets(HyperLogLogCollector collector, byte startOffset, byte endOffset)
|
||||
{
|
||||
byte offset = startOffset;
|
||||
while (offset <= endOffset) {
|
||||
// fill buckets to shift registerOffset
|
||||
for (short bucket = 0; bucket < 2048; ++bucket) {
|
||||
collector.add(bucket, offset);
|
||||
}
|
||||
offset++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoldOrder()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue