Adds bucketOrd back to cardinality algorithms (#62389) (#62427)

This commit is contained in:
Ignacio Vera 2020-09-16 08:41:57 +02:00 committed by GitHub
parent 8566e9e3e7
commit f3ed641fc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 110 deletions

View File

@ -45,7 +45,7 @@ abstract class AbstractCardinalityAlgorithm {
} }
/** Returns the current computed cardinality */ /** Returns the current computed cardinality */
public abstract long cardinality(); public abstract long cardinality(long bucketOrd);
static long linearCounting(long m, long v) { static long linearCounting(long m, long v) {
return Math.round(m * Math.log((double) m / v)); return Math.round(m * Math.log((double) m / v));

View File

@ -740,22 +740,22 @@ public abstract class AbstractHyperLogLog extends AbstractCardinalityAlgorithm {
/** Add a new runLen to the register. Implementor should only keep the value if it is /** Add a new runLen to the register. Implementor should only keep the value if it is
* bigger that the current value of the register provided. */ * bigger that the current value of the register provided. */
protected abstract void addRunLen(int register, int runLen); protected abstract void addRunLen(long bucketOrd, int register, int runLen);
/** Returns an iterator over all values of the register. */ /** Returns an iterator over all values of the register. */
protected abstract RunLenIterator getRunLens(); protected abstract RunLenIterator getRunLens(long bucketOrd);
public void collect(long hash) { public void collect(long bucketOrd, long hash) {
final int index = Math.toIntExact(index(hash, p)); final int index = Math.toIntExact(index(hash, p));
final int runLen = runLen(hash, p); final int runLen = runLen(hash, p);
addRunLen(index, runLen); addRunLen(bucketOrd, index, runLen);
} }
@Override @Override
public long cardinality() { public long cardinality(long bucketOrd) {
double inverseSum = 0; double inverseSum = 0;
int zeros = 0; int zeros = 0;
RunLenIterator iterator = getRunLens(); RunLenIterator iterator = getRunLens(bucketOrd);
while (iterator.next()) { while (iterator.next()) {
final int runLen = iterator.value(); final int runLen = iterator.value();
inverseSum += 1. / (1L << runLen); inverseSum += 1. / (1L << runLen);
@ -778,20 +778,20 @@ public abstract class AbstractHyperLogLog extends AbstractCardinalityAlgorithm {
} }
} }
public void collectEncoded(int encoded) { public void collectEncoded(long bucketOrd, int encoded) {
final int runLen = decodeRunLen(encoded, p); final int runLen = decodeRunLen(encoded, p);
final int index = decodeIndex(encoded, p); final int index = decodeIndex(encoded, p);
addRunLen(index, runLen); addRunLen(bucketOrd, index, runLen);
} }
public void merge(AbstractHyperLogLog other) { public void merge(long thisBucketOrd, AbstractHyperLogLog other, long otherBucketOrd) {
if (p != other.p) { if (p != other.p) {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
RunLenIterator iterator = other.getRunLens(); RunLenIterator iterator = other.getRunLens(otherBucketOrd);
for (int i = 0; i < m; ++i) { for (int i = 0; i < m; ++i) {
iterator.next(); iterator.next();
addRunLen(i, iterator.value()); addRunLen(thisBucketOrd, i, iterator.value());
} }
} }

View File

@ -42,26 +42,26 @@ public abstract class AbstractLinearCounting extends AbstractCardinalityAlgorith
* Add encoded value to the linear counting. Implementor should only accept the value if it has not been * Add encoded value to the linear counting. Implementor should only accept the value if it has not been
* seen before. * seen before.
*/ */
protected abstract int addEncoded(int encoded); protected abstract int addEncoded(long bucketOrd, int encoded);
/** /**
* number of values in the counter. * number of values in the counter.
*/ */
protected abstract int size(); protected abstract int size(long bucketOrd);
/** /**
* return the current values in the counter. * return the current values in the counter.
*/ */
protected abstract HashesIterator values(); protected abstract HashesIterator values(long bucketOrd);
public int collect(long hash) { public int collect(long bucketOrd, long hash) {
final int k = encodeHash(hash, p); final int k = encodeHash(hash, p);
return addEncoded(k); return addEncoded(bucketOrd, k);
} }
public long cardinality() { public long cardinality(long bucketOrd) {
final long m = 1 << P2; final long m = 1 << P2;
final long v = m - size(); final long v = m - size(bucketOrd);
return linearCounting(m, v); return linearCounting(m, v);
} }

View File

@ -104,61 +104,59 @@ public final class HyperLogLogPlusPlus implements Releasable {
if (precision() != other.precision()) { if (precision() != other.precision()) {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
hll.bucket = thisBucket;
lc.bucket = thisBucket;
hll.ensureCapacity(thisBucket + 1); hll.ensureCapacity(thisBucket + 1);
if (other.algorithm.get(otherBucket) == LINEAR_COUNTING) { if (other.algorithm.get(otherBucket) == LINEAR_COUNTING) {
other.lc.bucket = otherBucket; merge(thisBucket, other.lc, otherBucket);
final AbstractLinearCounting.HashesIterator values = other.lc.values();
while (values.next()) {
final int encoded = values.value();
if (algorithm.get(thisBucket) == LINEAR_COUNTING) {
final int newSize = lc.addEncoded(encoded);
if (newSize > lc.threshold) {
upgradeToHll(thisBucket);
}
} else {
hll.collectEncoded(encoded);
}
}
} else { } else {
if (algorithm.get(thisBucket) != HYPERLOGLOG) { merge(thisBucket, other.hll, otherBucket);
upgradeToHll(thisBucket);
}
other.hll.bucket = otherBucket;
hll.merge(other.hll);
} }
} }
private void merge(long thisBucket, AbstractLinearCounting other, long otherBucket) {
final AbstractLinearCounting.HashesIterator values = other.values(otherBucket);
while (values.next()) {
final int encoded = values.value();
if (algorithm.get(thisBucket) == LINEAR_COUNTING) {
final int newSize = lc.addEncoded(thisBucket, encoded);
if (newSize > lc.threshold) {
upgradeToHll(thisBucket);
}
} else {
hll.collectEncoded(thisBucket, encoded);
}
}
}
private void merge(long thisBucket, AbstractHyperLogLog other, long otherBucket) {
if (algorithm.get(thisBucket) != HYPERLOGLOG) {
upgradeToHll(thisBucket);
}
hll.merge(thisBucket, other, otherBucket);
}
public void collect(long bucket, long hash) { public void collect(long bucket, long hash) {
hll.ensureCapacity(bucket + 1); hll.ensureCapacity(bucket + 1);
if (algorithm.get(bucket) == LINEAR_COUNTING) { if (algorithm.get(bucket) == LINEAR_COUNTING) {
lc.bucket = bucket; final int newSize = lc.collect(bucket, hash);
final int newSize = lc.collect(hash);
if (newSize > lc.threshold) { if (newSize > lc.threshold) {
upgradeToHll(bucket); upgradeToHll(bucket);
} }
} else { } else {
hll.bucket = bucket; hll.collect(bucket, hash);
hll.collect(hash);
} }
} }
public long cardinality(long bucket) { public long cardinality(long bucket) {
if (algorithm.get(bucket) == LINEAR_COUNTING) { if (algorithm.get(bucket) == LINEAR_COUNTING) {
lc.bucket = bucket; return lc.cardinality(bucket);
return lc.cardinality();
} else { } else {
hll.bucket = bucket; return hll.cardinality(bucket);
return hll.cardinality();
} }
} }
void upgradeToHll(long bucket) { void upgradeToHll(long bucket) {
hll.ensureCapacity(bucket + 1); hll.ensureCapacity(bucket + 1);
lc.bucket = bucket; final AbstractLinearCounting.HashesIterator hashes = lc.values(bucket);
hll.bucket = bucket;
final AbstractLinearCounting.HashesIterator hashes = lc.values();
// We need to copy values into an arrays as we will override // We need to copy values into an arrays as we will override
// the values on the buffer // the values on the buffer
final IntArray values = lc.bigArrays.newIntArray(hashes.size()); final IntArray values = lc.bigArrays.newIntArray(hashes.size());
@ -168,10 +166,10 @@ public final class HyperLogLogPlusPlus implements Releasable {
values.set(i++, hashes.value()); values.set(i++, hashes.value());
} }
assert i == hashes.size(); assert i == hashes.size();
hll.reset(); hll.reset(bucket);
for (long j = 0; j < values.size(); ++j) { for (long j = 0; j < values.size(); ++j) {
final int encoded = values.get(j); final int encoded = values.get(j);
hll.collectEncoded(encoded); hll.collectEncoded(bucket, encoded);
} }
algorithm.set(bucket); algorithm.set(bucket);
} finally { } finally {
@ -186,11 +184,9 @@ public final class HyperLogLogPlusPlus implements Releasable {
private Object getComparableData(long bucket) { private Object getComparableData(long bucket) {
if (algorithm.get(bucket) == LINEAR_COUNTING) { if (algorithm.get(bucket) == LINEAR_COUNTING) {
lc.bucket = bucket; return lc.getComparableData(bucket);
return lc.getComparableData();
} else { } else {
hll.bucket = bucket; return hll.getComparableData(bucket);
return hll.getComparableData();
} }
} }
@ -208,16 +204,14 @@ public final class HyperLogLogPlusPlus implements Releasable {
out.writeVInt(precision()); out.writeVInt(precision());
if (algorithm.get(bucket) == LINEAR_COUNTING) { if (algorithm.get(bucket) == LINEAR_COUNTING) {
out.writeBoolean(LINEAR_COUNTING); out.writeBoolean(LINEAR_COUNTING);
lc.bucket = bucket; AbstractLinearCounting.HashesIterator hashes = lc.values(bucket);
AbstractLinearCounting.HashesIterator hashes = lc.values();
out.writeVLong(hashes.size()); out.writeVLong(hashes.size());
while (hashes.next()) { while (hashes.next()) {
out.writeInt(hashes.value()); out.writeInt(hashes.value());
} }
} else { } else {
out.writeBoolean(HYPERLOGLOG); out.writeBoolean(HYPERLOGLOG);
hll.bucket = bucket; AbstractHyperLogLog.RunLenIterator iterator = hll.getRunLens(bucket);
AbstractHyperLogLog.RunLenIterator iterator = hll.getRunLens();
while (iterator.next()){ while (iterator.next()){
out.writeByte(iterator.value()); out.writeByte(iterator.value());
} }
@ -231,29 +225,25 @@ public final class HyperLogLogPlusPlus implements Releasable {
if (algorithm == LINEAR_COUNTING) { if (algorithm == LINEAR_COUNTING) {
counts.algorithm.clear(0); counts.algorithm.clear(0);
final long size = in.readVLong(); final long size = in.readVLong();
counts.lc.bucket = 0;
for (long i = 0; i < size; ++i) { for (long i = 0; i < size; ++i) {
final int encoded = in.readInt(); final int encoded = in.readInt();
counts.lc.addEncoded(encoded); counts.lc.addEncoded(0, encoded);
} }
} else { } else {
counts.algorithm.set(0); counts.algorithm.set(0);
counts.hll.bucket = 0;
for (int i = 0; i < counts.hll.m; ++i) { for (int i = 0; i < counts.hll.m; ++i) {
counts.hll.addRunLen(i, in.readByte()); counts.hll.addRunLen(0, i, in.readByte());
} }
} }
return counts; return counts;
} }
private static class HyperLogLog extends AbstractHyperLogLog implements Releasable { private static class HyperLogLog extends AbstractHyperLogLog implements Releasable {
private final BigArrays bigArrays; private final BigArrays bigArrays;
private final HyperLogLogIterator iterator; private final HyperLogLogIterator iterator;
// array for holding the runlens. // array for holding the runlens.
private ByteArray runLens; private ByteArray runLens;
// Defines the position of the data structure. Callers of this object should set this value
// before calling any of the methods.
protected long bucket;
HyperLogLog(BigArrays bigArrays, long initialBucketCount, int precision) { HyperLogLog(BigArrays bigArrays, long initialBucketCount, int precision) {
super(precision); super(precision);
@ -263,25 +253,25 @@ public final class HyperLogLogPlusPlus implements Releasable {
} }
@Override @Override
protected void addRunLen(int register, int encoded) { protected void addRunLen(long bucketOrd, int register, int encoded) {
final long bucketIndex = (bucket << p) + register; final long bucketIndex = (bucketOrd << p) + register;
runLens.set(bucketIndex, (byte) Math.max(encoded, runLens.get(bucketIndex))); runLens.set(bucketIndex, (byte) Math.max(encoded, runLens.get(bucketIndex)));
} }
@Override @Override
protected RunLenIterator getRunLens() { protected RunLenIterator getRunLens(long bucketOrd) {
iterator.reset(bucket); iterator.reset(bucketOrd);
return iterator; return iterator;
} }
protected void reset() { protected void reset(long bucketOrd) {
runLens.fill(bucket << p, (bucket << p) + m, (byte) 0); runLens.fill(bucketOrd << p, (bucketOrd << p) + m, (byte) 0);
} }
protected Object getComparableData() { protected Object getComparableData(long bucketOrd) {
Map<Byte, Integer> values = new HashMap<>(); Map<Byte, Integer> values = new HashMap<>();
for (long i = 0; i < runLens.size(); i++) { for (long i = 0; i < runLens.size(); i++) {
byte runLength = runLens.get((bucket << p) + i); byte runLength = runLens.get((bucketOrd << p) + i);
Integer numOccurances = values.get(runLength); Integer numOccurances = values.get(runLength);
if (numOccurances == null) { if (numOccurances == null) {
values.put(runLength, 1); values.put(runLength, 1);
@ -306,8 +296,8 @@ public final class HyperLogLogPlusPlus implements Releasable {
private final HyperLogLog hll; private final HyperLogLog hll;
private final int m, p; private final int m, p;
int pos; private int pos;
long start; private long start;
private byte value; private byte value;
HyperLogLogIterator(HyperLogLog hll, int p, int m) { HyperLogLogIterator(HyperLogLog hll, int p, int m) {
@ -339,28 +329,22 @@ public final class HyperLogLogPlusPlus implements Releasable {
private static class LinearCounting extends AbstractLinearCounting implements Releasable { private static class LinearCounting extends AbstractLinearCounting implements Releasable {
private final int capacity;
protected final int threshold; protected final int threshold;
private final int mask; private final int mask;
private final BytesRef readSpare; private final BytesRef readSpare;
private final ByteBuffer writeSpare; private final ByteBuffer writeSpare;
private final int p;
private final BigArrays bigArrays; private final BigArrays bigArrays;
private final LinearCountingIterator iterator; private final LinearCountingIterator iterator;
// We are actually using HyperLogLog's runLens array but interpreting it as a hash set for linear counting. // We are actually using HyperLogLog's runLens array but interpreting it as a hash set for linear counting.
private final HyperLogLog hll; private final HyperLogLog hll;
// Number of elements stored. // Number of elements stored.
private IntArray sizes; private IntArray sizes;
// Defines the position of the data structure. Callers of this object should set this value
// before calling any of the methods.
protected long bucket;
LinearCounting(BigArrays bigArrays, long initialBucketCount, int p, HyperLogLog hll) { LinearCounting(BigArrays bigArrays, long initialBucketCount, int p, HyperLogLog hll) {
super(p); super(p);
this.bigArrays = bigArrays; this.bigArrays = bigArrays;
this.hll = hll; this.hll = hll;
capacity = (1 << p) / 4; // because ints take 4 bytes final int capacity = (1 << p) / 4; // because ints take 4 bytes
this.p = p;
threshold = (int) (capacity * MAX_LOAD_FACTOR); threshold = (int) (capacity * MAX_LOAD_FACTOR);
mask = capacity - 1; mask = capacity - 1;
sizes = bigArrays.newIntArray(initialBucketCount); sizes = bigArrays.newIntArray(initialBucketCount);
@ -370,15 +354,15 @@ public final class HyperLogLogPlusPlus implements Releasable {
} }
@Override @Override
protected int addEncoded(int encoded) { protected int addEncoded(long bucketOrd, int encoded) {
sizes = bigArrays.grow(sizes, bucket + 1); sizes = bigArrays.grow(sizes, bucketOrd + 1);
assert encoded != 0; assert encoded != 0;
for (int i = (encoded & mask);; i = (i + 1) & mask) { for (int i = (encoded & mask);; i = (i + 1) & mask) {
final int v = get(i); final int v = get(bucketOrd, i);
if (v == 0) { if (v == 0) {
// means unused, take it! // means unused, take it!
set(i, encoded); set(bucketOrd, i, encoded);
return sizes.increment(bucket, 1); return sizes.increment(bucketOrd, 1);
} else if (v == encoded) { } else if (v == encoded) {
// k is already in the set // k is already in the set
return -1; return -1;
@ -387,48 +371,48 @@ public final class HyperLogLogPlusPlus implements Releasable {
} }
@Override @Override
protected int size() { protected int size(long bucketOrd) {
if (bucket >= sizes.size()) { if (bucketOrd >= sizes.size()) {
return 0; return 0;
} }
final int size = sizes.get(bucket); final int size = sizes.get(bucketOrd);
assert size == recomputedSize(); assert size == recomputedSize(bucketOrd);
return size; return size;
} }
@Override @Override
protected HashesIterator values() { protected HashesIterator values(long bucketOrd) {
iterator.reset(size()); iterator.reset(bucketOrd, size(bucketOrd));
return iterator; return iterator;
} }
protected Object getComparableData() { protected Object getComparableData(long bucketOrd) {
Set<Integer> values = new HashSet<>(); Set<Integer> values = new HashSet<>();
HashesIterator iteratorValues = values(); HashesIterator iteratorValues = values(bucketOrd);
while (iteratorValues.next()) { while (iteratorValues.next()) {
values.add(iteratorValues.value()); values.add(iteratorValues.value());
} }
return values; return values;
} }
private long index(int index) { private long index(long bucketOrd, int index) {
return (bucket << p) + (index << 2); return (bucketOrd << p) + (index << 2);
} }
private int get(int index) { private int get(long bucketOrd, int index) {
hll.runLens.get(index(index), 4, readSpare); hll.runLens.get(index(bucketOrd, index), 4, readSpare);
return ByteUtils.readIntLE(readSpare.bytes, readSpare.offset); return ByteUtils.readIntLE(readSpare.bytes, readSpare.offset);
} }
private void set(int index, int value) { private void set(long bucketOrd, int index, int value) {
writeSpare.putInt(0, value); writeSpare.putInt(0, value);
hll.runLens.set(index(index), writeSpare.array(), 0, 4); hll.runLens.set(index(bucketOrd, index), writeSpare.array(), 0, 4);
} }
private int recomputedSize() { private int recomputedSize(long bucketOrd) {
int size = 0; int size = 0;
for (int i = 0; i <= mask; ++i) { for (int i = 0; i <= mask; ++i) {
final int v = get(i); final int v = get(bucketOrd, i);
if (v != 0) { if (v != 0) {
++size; ++size;
} }
@ -446,8 +430,8 @@ public final class HyperLogLogPlusPlus implements Releasable {
private final LinearCounting lc; private final LinearCounting lc;
private final int capacity; private final int capacity;
int pos; private int pos;
long size; private long size, bucketOrd;
private int value; private int value;
LinearCountingIterator(LinearCounting lc, int capacity) { LinearCountingIterator(LinearCounting lc, int capacity) {
@ -455,9 +439,10 @@ public final class HyperLogLogPlusPlus implements Releasable {
this.capacity = capacity; this.capacity = capacity;
} }
void reset(long size) { void reset(long bucketOrd, long size) {
this.pos = size == 0 ? capacity : 0; this.bucketOrd = bucketOrd;
this.size = size; this.size = size;
this.pos = size == 0 ? capacity : 0;
} }
@Override @Override
@ -469,7 +454,7 @@ public final class HyperLogLogPlusPlus implements Releasable {
public boolean next() { public boolean next() {
if (pos < capacity) { if (pos < capacity) {
for (; pos < capacity; ++pos) { for (; pos < capacity; ++pos) {
final int k = lc.get(pos); final int k = lc.get(bucketOrd, pos);
if (k != 0) { if (k != 0) {
++pos; ++pos;
value = k; value = k;