Introduce a sparse HyperLogLogPlusPlus class for cloning and serializing low cardinality buckets (#62480) (#62520)

Reduces the memory footprint of an HLL++ structure that uses Linear counting when cloning or deserialising the data structure.
This commit is contained in:
Ignacio Vera 2020-09-17 08:54:50 +02:00 committed by GitHub
parent 2fd28d0944
commit 2d3ca9c155
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 553 additions and 195 deletions

View File

@ -784,17 +784,6 @@ public abstract class AbstractHyperLogLog extends AbstractCardinalityAlgorithm {
addRunLen(bucketOrd, index, runLen); addRunLen(bucketOrd, index, runLen);
} }
public void merge(long thisBucketOrd, AbstractHyperLogLog other, long otherBucketOrd) {
if (p != other.p) {
throw new IllegalArgumentException();
}
RunLenIterator iterator = other.getRunLens(otherBucketOrd);
for (int i = 0; i < m; ++i) {
iterator.next();
addRunLen(thisBucketOrd, i, iterator.value());
}
}
static long index(long hash, int p) { static long index(long hash, int p) {
return hash >>> (64 - p); return hash >>> (64 - p);
} }

View File

@ -0,0 +1,154 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.BigArrays;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Base class for HLL++ algorithms.
*
* It contains methods for cloning and serializing the data structure.
*/
public abstract class AbstractHyperLogLogPlusPlus extends AbstractCardinalityAlgorithm implements Releasable {
public static final boolean LINEAR_COUNTING = false;
public static final boolean HYPERLOGLOG = true;
public AbstractHyperLogLogPlusPlus(int precision) {
super(precision);
}
/** Algorithm used in the given bucket */
protected abstract boolean getAlgorithm(long bucketOrd);
/** Get linear counting algorithm */
protected abstract AbstractLinearCounting.HashesIterator getLinearCounting(long bucketOrd);
/** Get HyperLogLog algorithm */
protected abstract AbstractHyperLogLog.RunLenIterator getHyperLogLog(long bucketOrd);
/** Get the number of data structures */
public abstract long maxOrd();
/** Collect a value in the given bucket */
public abstract void collect(long bucketOrd, long hash);
/** Clone the data structure at the given bucket */
public AbstractHyperLogLogPlusPlus clone(long bucketOrd, BigArrays bigArrays) {
if (getAlgorithm(bucketOrd) == LINEAR_COUNTING) {
// we use a sparse structure for linear counting
AbstractLinearCounting.HashesIterator iterator = getLinearCounting(bucketOrd);
int size = Math.toIntExact(iterator.size());
HyperLogLogPlusPlusSparse clone = new HyperLogLogPlusPlusSparse(precision(), bigArrays, size, 1);
while (iterator.next()) {
clone.addEncoded(0, iterator.value());
}
return clone;
} else {
HyperLogLogPlusPlus clone = new HyperLogLogPlusPlus(precision(), bigArrays, 1);
clone.merge(0, this, bucketOrd);
return clone;
}
}
private Object getComparableData(long bucketOrd) {
if (getAlgorithm(bucketOrd) == LINEAR_COUNTING) {
Set<Integer> values = new HashSet<>();
AbstractLinearCounting.HashesIterator iteratorValues = getLinearCounting(bucketOrd);
while (iteratorValues.next()) {
values.add(iteratorValues.value());
}
return values;
} else {
Map<Byte, Integer> values = new HashMap<>();
AbstractHyperLogLog.RunLenIterator iterator = getHyperLogLog(bucketOrd);
while (iterator.next()) {
byte runLength = iterator.value();
Integer numOccurances = values.get(runLength);
if (numOccurances == null) {
values.put(runLength, 1);
} else {
values.put(runLength, numOccurances + 1);
}
}
return values;
}
}
public void writeTo(long bucket, StreamOutput out) throws IOException {
out.writeVInt(precision());
if (getAlgorithm(bucket) == LINEAR_COUNTING) {
out.writeBoolean(LINEAR_COUNTING);
AbstractLinearCounting.HashesIterator hashes = getLinearCounting(bucket);
out.writeVLong(hashes.size());
while (hashes.next()) {
out.writeInt(hashes.value());
}
} else {
out.writeBoolean(HYPERLOGLOG);
AbstractHyperLogLog.RunLenIterator iterator = getHyperLogLog(bucket);
while (iterator.next()){
out.writeByte(iterator.value());
}
}
}
public static AbstractHyperLogLogPlusPlus readFrom(StreamInput in, BigArrays bigArrays) throws IOException {
final int precision = in.readVInt();
final boolean algorithm = in.readBoolean();
if (algorithm == LINEAR_COUNTING) {
// we use a sparse structure for linear counting
final long size = in.readVLong();
HyperLogLogPlusPlusSparse counts = new HyperLogLogPlusPlusSparse(precision, bigArrays, Math.toIntExact(size), 1);
for (long i = 0; i < size; ++i) {
counts.addEncoded(0, in.readInt());
}
return counts;
} else {
HyperLogLogPlusPlus counts = new HyperLogLogPlusPlus(precision, bigArrays, 1);
final int registers = 1 << precision;
for (int i = 0; i < registers; ++i) {
counts.addRunLen(0, i, in.readByte());
}
return counts;
}
}
public boolean equals(long thisBucket, AbstractHyperLogLogPlusPlus other, long otherBucket) {
return Objects.equals(precision(), other.precision())
&& Objects.equals(getAlgorithm(thisBucket), other.getAlgorithm(otherBucket))
&& Objects.equals(getComparableData(thisBucket), other.getComparableData(otherBucket));
}
public int hashCode(long bucket) {
return Objects.hash(precision(), getAlgorithm(bucket), getComparableData(bucket));
}
}

View File

@ -59,6 +59,7 @@ public abstract class AbstractLinearCounting extends AbstractCardinalityAlgorith
return addEncoded(bucketOrd, k); return addEncoded(bucketOrd, k);
} }
@Override
public long cardinality(long bucketOrd) { public long cardinality(long bucketOrd) {
final long m = 1 << P2; final long m = 1 << P2;
final long v = m - size(bucketOrd); final long v = m - size(bucketOrd);
@ -92,7 +93,7 @@ public abstract class AbstractLinearCounting extends AbstractCardinalityAlgorith
/** /**
* number of elements in the iterator * number of elements in the iterator
*/ */
long size(); int size();
/** /**
* Moves the iterator to the next element if it exists. * Moves the iterator to the next element if it exists.

View File

@ -157,13 +157,12 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
@Override @Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) { public InternalAggregation buildAggregation(long owningBucketOrdinal) {
if (counts == null || owningBucketOrdinal >= counts.maxBucket() || counts.cardinality(owningBucketOrdinal) == 0) { if (counts == null || owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) {
return buildEmptyAggregation(); return buildEmptyAggregation();
} }
// We need to build a copy because the returned Aggregation needs remain usable after // We need to build a copy because the returned Aggregation needs remain usable after
// this Aggregator (and its HLL++ counters) is released. // this Aggregator (and its HLL++ counters) is released.
HyperLogLogPlusPlus copy = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); AbstractHyperLogLogPlusPlus copy = counts.clone(owningBucketOrdinal, BigArrays.NON_RECYCLING_INSTANCE);
copy.merge(0, counts, owningBucketOrdinal);
return new InternalCardinality(name, copy, metadata()); return new InternalCardinality(name, copy, metadata());
} }

View File

@ -21,8 +21,6 @@ package org.elasticsearch.search.aggregations.metrics;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
@ -31,14 +29,8 @@ import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.ByteUtils; import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.IntArray;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/** /**
* Hyperloglog++ counter, implemented based on pseudo code from * Hyperloglog++ counter, implemented based on pseudo code from
@ -56,11 +48,10 @@ import java.util.Set;
* *
* It supports storing several HyperLogLogPlusPlus structures which are identified by a bucket number. * It supports storing several HyperLogLogPlusPlus structures which are identified by a bucket number.
*/ */
public final class HyperLogLogPlusPlus implements Releasable { public final class HyperLogLogPlusPlus extends AbstractHyperLogLogPlusPlus {
private static final float MAX_LOAD_FACTOR = 0.75f; private static final float MAX_LOAD_FACTOR = 0.75f;
private static final boolean LINEAR_COUNTING = false;
private static final boolean HYPERLOGLOG = true;
public static final int DEFAULT_PRECISION = 14; public static final int DEFAULT_PRECISION = 14;
private final BitArray algorithm; private final BitArray algorithm;
@ -85,35 +76,103 @@ public final class HyperLogLogPlusPlus implements Releasable {
return 1L << precision; return 1L << precision;
} }
public HyperLogLogPlusPlus(int precision, BigArrays bigArrays, long initialBucketCount) { public HyperLogLogPlusPlus(int precision, BigArrays bigArrays, long initialBucketCount) {
super(precision);
hll = new HyperLogLog(bigArrays, initialBucketCount, precision); hll = new HyperLogLog(bigArrays, initialBucketCount, precision);
lc = new LinearCounting(bigArrays, initialBucketCount, precision, hll); lc = new LinearCounting(bigArrays, initialBucketCount, precision, hll);
algorithm = new BitArray(1, bigArrays); algorithm = new BitArray(1, bigArrays);
} }
public int precision() { @Override
return hll.precision(); public long maxOrd() {
}
public long maxBucket() {
return hll.runLens.size() >>> hll.precision(); return hll.runLens.size() >>> hll.precision();
} }
public void merge(long thisBucket, HyperLogLogPlusPlus other, long otherBucket) { @Override
public long cardinality(long bucketOrd) {
if (getAlgorithm(bucketOrd) == LINEAR_COUNTING) {
return lc.cardinality(bucketOrd);
} else {
return hll.cardinality(bucketOrd);
}
}
@Override
protected boolean getAlgorithm(long bucketOrd) {
return algorithm.get(bucketOrd);
}
@Override
protected AbstractLinearCounting.HashesIterator getLinearCounting(long bucketOrd) {
return lc.values(bucketOrd);
}
@Override
protected AbstractHyperLogLog.RunLenIterator getHyperLogLog(long bucketOrd) {
return hll.getRunLens(bucketOrd);
}
@Override
public void collect(long bucket, long hash) {
hll.ensureCapacity(bucket + 1);
if (algorithm.get(bucket) == LINEAR_COUNTING) {
final int newSize = lc.collect(bucket, hash);
if (newSize > lc.threshold) {
upgradeToHll(bucket);
}
} else {
hll.collect(bucket, hash);
}
}
@Override
public void close() {
Releasables.close(algorithm, hll, lc);
}
protected void addRunLen(long bucketOrd, int register, int runLen) {
if (algorithm.get(bucketOrd) == LINEAR_COUNTING) {
upgradeToHll(bucketOrd);
}
hll.addRunLen(0, register, runLen);
}
void upgradeToHll(long bucketOrd) {
hll.ensureCapacity(bucketOrd + 1);
final AbstractLinearCounting.HashesIterator hashes = lc.values(bucketOrd);
// We need to copy values into an arrays as we will override
// the values on the buffer
final IntArray values = lc.bigArrays.newIntArray(hashes.size());
try {
int i = 0;
while (hashes.next()) {
values.set(i++, hashes.value());
}
assert i == hashes.size();
hll.reset(bucketOrd);
for (long j = 0; j < values.size(); ++j) {
final int encoded = values.get(j);
hll.collectEncoded(bucketOrd, encoded);
}
algorithm.set(bucketOrd);
} finally {
Releasables.close(values);
}
}
public void merge(long thisBucket, AbstractHyperLogLogPlusPlus other, long otherBucket) {
if (precision() != other.precision()) { if (precision() != other.precision()) {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
hll.ensureCapacity(thisBucket + 1); hll.ensureCapacity(thisBucket + 1);
if (other.algorithm.get(otherBucket) == LINEAR_COUNTING) { if (other.getAlgorithm(otherBucket) == LINEAR_COUNTING) {
merge(thisBucket, other.lc, otherBucket); merge(thisBucket, other.getLinearCounting(otherBucket));
} else { } else {
merge(thisBucket, other.hll, otherBucket); merge(thisBucket, other.getHyperLogLog(otherBucket));
} }
} }
private void merge(long thisBucket, AbstractLinearCounting other, long otherBucket) { private void merge(long thisBucket, AbstractLinearCounting.HashesIterator values) {
final AbstractLinearCounting.HashesIterator values = other.values(otherBucket);
while (values.next()) { while (values.next()) {
final int encoded = values.value(); final int encoded = values.value();
if (algorithm.get(thisBucket) == LINEAR_COUNTING) { if (algorithm.get(thisBucket) == LINEAR_COUNTING) {
@ -127,124 +186,23 @@ public final class HyperLogLogPlusPlus implements Releasable {
} }
} }
private void merge(long thisBucket, AbstractHyperLogLog other, long otherBucket) { private void merge(long thisBucket, AbstractHyperLogLog.RunLenIterator runLens) {
if (algorithm.get(thisBucket) != HYPERLOGLOG) { if (algorithm.get(thisBucket) != HYPERLOGLOG) {
upgradeToHll(thisBucket); upgradeToHll(thisBucket);
} }
hll.merge(thisBucket, other, otherBucket); for (int i = 0; i < hll.m; ++i) {
runLens.next();
hll.addRunLen(thisBucket, i, runLens.value());
} }
public void collect(long bucket, long hash) {
hll.ensureCapacity(bucket + 1);
if (algorithm.get(bucket) == LINEAR_COUNTING) {
final int newSize = lc.collect(bucket, hash);
if (newSize > lc.threshold) {
upgradeToHll(bucket);
}
} else {
hll.collect(bucket, hash);
}
}
public long cardinality(long bucket) {
if (algorithm.get(bucket) == LINEAR_COUNTING) {
return lc.cardinality(bucket);
} else {
return hll.cardinality(bucket);
}
}
void upgradeToHll(long bucket) {
hll.ensureCapacity(bucket + 1);
final AbstractLinearCounting.HashesIterator hashes = lc.values(bucket);
// We need to copy values into an arrays as we will override
// the values on the buffer
final IntArray values = lc.bigArrays.newIntArray(hashes.size());
try {
int i = 0;
while (hashes.next()) {
values.set(i++, hashes.value());
}
assert i == hashes.size();
hll.reset(bucket);
for (long j = 0; j < values.size(); ++j) {
final int encoded = values.get(j);
hll.collectEncoded(bucket, encoded);
}
algorithm.set(bucket);
} finally {
Releasables.close(values);
}
}
@Override
public void close() {
Releasables.close(algorithm, hll, lc);
}
private Object getComparableData(long bucket) {
if (algorithm.get(bucket) == LINEAR_COUNTING) {
return lc.getComparableData(bucket);
} else {
return hll.getComparableData(bucket);
}
}
public int hashCode(long bucket) {
return Objects.hash(precision(), algorithm.get(bucket), getComparableData(bucket));
}
public boolean equals(long bucket, HyperLogLogPlusPlus other) {
return Objects.equals(precision(), other.precision())
&& Objects.equals(algorithm.get(bucket), other.algorithm.get(bucket))
&& Objects.equals(getComparableData(bucket), other.getComparableData(bucket));
}
public void writeTo(long bucket, StreamOutput out) throws IOException {
out.writeVInt(precision());
if (algorithm.get(bucket) == LINEAR_COUNTING) {
out.writeBoolean(LINEAR_COUNTING);
AbstractLinearCounting.HashesIterator hashes = lc.values(bucket);
out.writeVLong(hashes.size());
while (hashes.next()) {
out.writeInt(hashes.value());
}
} else {
out.writeBoolean(HYPERLOGLOG);
AbstractHyperLogLog.RunLenIterator iterator = hll.getRunLens(bucket);
while (iterator.next()){
out.writeByte(iterator.value());
}
}
}
public static HyperLogLogPlusPlus readFrom(StreamInput in, BigArrays bigArrays) throws IOException {
final int precision = in.readVInt();
HyperLogLogPlusPlus counts = new HyperLogLogPlusPlus(precision, bigArrays, 1);
final boolean algorithm = in.readBoolean();
if (algorithm == LINEAR_COUNTING) {
counts.algorithm.clear(0);
final long size = in.readVLong();
for (long i = 0; i < size; ++i) {
final int encoded = in.readInt();
counts.lc.addEncoded(0, encoded);
}
} else {
counts.algorithm.set(0);
for (int i = 0; i < counts.hll.m; ++i) {
counts.hll.addRunLen(0, i, in.readByte());
}
}
return counts;
} }
private static class HyperLogLog extends AbstractHyperLogLog implements Releasable { private static class HyperLogLog extends AbstractHyperLogLog implements Releasable {
private final BigArrays bigArrays; private final BigArrays bigArrays;
private final HyperLogLogIterator iterator; private final HyperLogLogIterator iterator;
// array for holding the runlens. // array for holding the runlens.
private ByteArray runLens; private ByteArray runLens;
HyperLogLog(BigArrays bigArrays, long initialBucketCount, int precision) { HyperLogLog(BigArrays bigArrays, long initialBucketCount, int precision) {
super(precision); super(precision);
this.runLens = bigArrays.newByteArray(initialBucketCount << precision); this.runLens = bigArrays.newByteArray(initialBucketCount << precision);
@ -268,20 +226,6 @@ public final class HyperLogLogPlusPlus implements Releasable {
runLens.fill(bucketOrd << p, (bucketOrd << p) + m, (byte) 0); runLens.fill(bucketOrd << p, (bucketOrd << p) + m, (byte) 0);
} }
protected Object getComparableData(long bucketOrd) {
Map<Byte, Integer> values = new HashMap<>();
for (long i = 0; i < runLens.size(); i++) {
byte runLength = runLens.get((bucketOrd << p) + i);
Integer numOccurances = values.get(runLength);
if (numOccurances == null) {
values.put(runLength, 1);
} else {
values.put(runLength, numOccurances + 1);
}
}
return values;
}
protected void ensureCapacity(long numBuckets) { protected void ensureCapacity(long numBuckets) {
runLens = bigArrays.grow(runLens, numBuckets << p); runLens = bigArrays.grow(runLens, numBuckets << p);
} }
@ -296,8 +240,8 @@ public final class HyperLogLogPlusPlus implements Releasable {
private final HyperLogLog hll; private final HyperLogLog hll;
private final int m, p; private final int m, p;
private int pos; int pos;
private long start; long start;
private byte value; private byte value;
HyperLogLogIterator(HyperLogLog hll, int p, int m) { HyperLogLogIterator(HyperLogLog hll, int p, int m) {
@ -386,15 +330,6 @@ public final class HyperLogLogPlusPlus implements Releasable {
return iterator; return iterator;
} }
protected Object getComparableData(long bucketOrd) {
Set<Integer> values = new HashSet<>();
HashesIterator iteratorValues = values(bucketOrd);
while (iteratorValues.next()) {
values.add(iteratorValues.value());
}
return values;
}
private long index(long bucketOrd, int index) { private long index(long bucketOrd, int index) {
return (bucketOrd << p) + (index << 2); return (bucketOrd << p) + (index << 2);
} }
@ -430,8 +365,8 @@ public final class HyperLogLogPlusPlus implements Releasable {
private final LinearCounting lc; private final LinearCounting lc;
private final int capacity; private final int capacity;
private int pos; private int pos, size;
private long size, bucketOrd; private long bucketOrd;
private int value; private int value;
LinearCountingIterator(LinearCounting lc, int capacity) { LinearCountingIterator(LinearCounting lc, int capacity) {
@ -439,14 +374,14 @@ public final class HyperLogLogPlusPlus implements Releasable {
this.capacity = capacity; this.capacity = capacity;
} }
void reset(long bucketOrd, long size) { void reset(long bucketOrd, int size) {
this.bucketOrd = bucketOrd; this.bucketOrd = bucketOrd;
this.size = size; this.size = size;
this.pos = size == 0 ? capacity : 0; this.pos = size == 0 ? capacity : 0;
} }
@Override @Override
public long size() { public int size() {
return size; return size;
} }

View File

@ -0,0 +1,205 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.IntArray;
/**
* AbstractHyperLogLogPlusPlus instance that only supports linear counting. The maximum number of hashes supported
* by the structure is determined at construction time.
*
* This structure expects all the added values to be distinct and therefore there are no checks
* if an element has been previously added.
*/
final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implements Releasable {
private final LinearCounting lc;
/**
* Create an sparse HLL++ algorithm where capacity is the maximum number of hashes this structure can hold
* per bucket.
*/
HyperLogLogPlusPlusSparse(int precision, BigArrays bigArrays, int capacity, int initialSize) {
super(precision);
this.lc = new LinearCounting(precision, bigArrays, capacity, initialSize);
}
@Override
public long maxOrd() {
return lc.sizes.size();
}
@Override
public long cardinality(long bucketOrd) {
return lc.cardinality(bucketOrd);
}
@Override
protected boolean getAlgorithm(long bucketOrd) {
return LINEAR_COUNTING;
}
@Override
protected AbstractLinearCounting.HashesIterator getLinearCounting(long bucketOrd) {
return lc.values(bucketOrd);
}
@Override
protected AbstractHyperLogLog.RunLenIterator getHyperLogLog(long bucketOrd) {
throw new IllegalArgumentException("Implementation does not support HLL structures");
}
@Override
public void collect(long bucket, long hash) {
lc.collect(bucket, hash);
}
@Override
public void close() {
Releasables.close(lc);
}
protected void addEncoded(long bucket, int encoded) {
lc.addEncoded(bucket, encoded);
}
private static class LinearCounting extends AbstractLinearCounting implements Releasable {
private final int capacity;
private final BigArrays bigArrays;
private final LinearCountingIterator iterator;
// We are actually using HyperLogLog's runLens array but interpreting it as a hash set for linear counting.
// Number of elements stored.
private IntArray values;
private IntArray sizes;
LinearCounting(int p, BigArrays bigArrays, int capacity, int initialSize) {
super(p);
this.bigArrays = bigArrays;
this.capacity = capacity;
values = bigArrays.newIntArray(initialSize * capacity);
sizes = bigArrays.newIntArray(initialSize * capacity);
iterator = new LinearCountingIterator(this, capacity);
}
@Override
protected int addEncoded(long bucketOrd, int encoded) {
assert encoded != 0;
return set(bucketOrd, encoded);
}
@Override
protected int size(long bucketOrd) {
if (bucketOrd >= sizes.size()) {
return 0;
}
final int size = sizes.get(bucketOrd);
assert size == recomputedSize(bucketOrd);
return size;
}
@Override
protected HashesIterator values(long bucketOrd) {
iterator.reset(bucketOrd, size(bucketOrd));
return iterator;
}
private long index(long bucketOrd, int index) {
return (bucketOrd * capacity) + index;
}
private int get(long bucketOrd, int index) {
long globalIndex = index(bucketOrd, index);
if (values.size() < globalIndex) {
return 0;
}
return values.get(globalIndex);
}
private int set(long bucketOrd, int value) {
int size = size(bucketOrd);
if (size == 0) {
sizes = bigArrays.grow(sizes, bucketOrd + 1);
values = bigArrays.grow(values, (bucketOrd * capacity) + capacity);
}
values.set(index(bucketOrd, size), value);
return sizes.increment(bucketOrd, 1);
}
private int recomputedSize(long bucketOrd) {
for (int i = 0; i < capacity; ++i) {
final int v = get(bucketOrd, i);
if (v == 0) {
return i;
}
}
return capacity;
}
@Override
public void close() {
Releasables.close(values, sizes);
}
}
private static class LinearCountingIterator implements AbstractLinearCounting.HashesIterator {
private final LinearCounting lc;
private final int capacity;
long start;
long end;
private int value, size;
private long pos;
LinearCountingIterator(LinearCounting lc, int capacity) {
this.lc = lc;
this.capacity = capacity;
}
void reset(long bucketOrd, int size) {
this.start = bucketOrd * capacity;
this.size = size;
this.end = start + size;
this.pos = start;
}
@Override
public int size() {
return size;
}
@Override
public boolean next() {
if (pos < end) {
value = lc.values.get(pos++);
return true;
}
return false;
}
@Override
public int value() {
return value;
}
}
}

View File

@ -32,9 +32,9 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
public final class InternalCardinality extends InternalNumericMetricsAggregation.SingleValue implements Cardinality { public final class InternalCardinality extends InternalNumericMetricsAggregation.SingleValue implements Cardinality {
private final HyperLogLogPlusPlus counts; private final AbstractHyperLogLogPlusPlus counts;
InternalCardinality(String name, HyperLogLogPlusPlus counts, Map<String, Object> metadata) { InternalCardinality(String name, AbstractHyperLogLogPlusPlus counts, Map<String, Object> metadata) {
super(name, metadata); super(name, metadata);
this.counts = counts; this.counts = counts;
} }
@ -46,7 +46,7 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
super(in); super(in);
format = in.readNamedWriteable(DocValueFormat.class); format = in.readNamedWriteable(DocValueFormat.class);
if (in.readBoolean()) { if (in.readBoolean()) {
counts = HyperLogLogPlusPlus.readFrom(in, BigArrays.NON_RECYCLING_INSTANCE); counts = AbstractHyperLogLogPlusPlus.readFrom(in, BigArrays.NON_RECYCLING_INSTANCE);
} else { } else {
counts = null; counts = null;
} }
@ -78,34 +78,30 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
return counts == null ? 0 : counts.cardinality(0); return counts == null ? 0 : counts.cardinality(0);
} }
public HyperLogLogPlusPlus getCounts() { public AbstractHyperLogLogPlusPlus getCounts() {
return counts; return counts;
} }
@Override @Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
InternalCardinality reduced = null; HyperLogLogPlusPlus reduced = null;
for (InternalAggregation aggregation : aggregations) { for (InternalAggregation aggregation : aggregations) {
final InternalCardinality cardinality = (InternalCardinality) aggregation; final InternalCardinality cardinality = (InternalCardinality) aggregation;
if (cardinality.counts != null) { if (cardinality.counts != null) {
if (reduced == null) { if (reduced == null) {
reduced = new InternalCardinality(name, new HyperLogLogPlusPlus(cardinality.counts.precision(), reduced = new HyperLogLogPlusPlus(cardinality.counts.precision(),
BigArrays.NON_RECYCLING_INSTANCE, 1), getMetadata()); BigArrays.NON_RECYCLING_INSTANCE, 1);
} }
reduced.merge(cardinality); reduced.merge(0, cardinality.counts, 0);
} }
} }
if (reduced == null) { // all empty if (reduced == null) { // all empty
return aggregations.get(0); return aggregations.get(0);
} else { } else {
return reduced; return new InternalCardinality(name, reduced, getMetadata());
}
}
public void merge(InternalCardinality other) { }
assert counts != null && other != null;
counts.merge(0, other.counts, 0);
} }
@Override @Override
@ -127,10 +123,10 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
if (super.equals(obj) == false) return false; if (super.equals(obj) == false) return false;
InternalCardinality other = (InternalCardinality) obj; InternalCardinality other = (InternalCardinality) obj;
return counts.equals(0, other.counts); return counts.equals(0, other.counts, 0);
} }
HyperLogLogPlusPlus getState() { AbstractHyperLogLogPlusPlus getState() {
return counts; return counts;
} }
} }

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics;
import com.carrotsearch.hppc.BitMixer;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MAX_PRECISION;
import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MIN_PRECISION;
public class HyperLogLogPlusPlusSparseTests extends ESTestCase {
public void testEquivalence() throws IOException {
final int p = randomIntBetween(MIN_PRECISION, MAX_PRECISION);
final HyperLogLogPlusPlus single = new HyperLogLogPlusPlus(p, BigArrays.NON_RECYCLING_INSTANCE, 0);
final int numBuckets = randomIntBetween(2, 100);
final int numValues = randomIntBetween(1, 100000);
final int maxValue = randomIntBetween(1, randomBoolean() ? 1000: 1000000);
for (int i = 0; i < numValues; ++i) {
final int n = randomInt(maxValue);
final long hash = BitMixer.mix64(n);
single.collect(randomInt(numBuckets), hash);
}
for (int i = 0; i < numBuckets; i++) {
// test clone
AbstractHyperLogLogPlusPlus clone = single.clone(i, BigArrays.NON_RECYCLING_INSTANCE);
if (single.getAlgorithm(i) == AbstractHyperLogLogPlusPlus.LINEAR_COUNTING) {
assertTrue(clone instanceof HyperLogLogPlusPlusSparse);
} else {
assertTrue(clone instanceof HyperLogLogPlusPlus);
}
checkEquivalence(single, i, clone, 0);
// test serialize
BytesStreamOutput out = new BytesStreamOutput();
single.writeTo(i, out);
clone = AbstractHyperLogLogPlusPlus.readFrom(out.bytes().streamInput(), BigArrays.NON_RECYCLING_INSTANCE);
if (single.getAlgorithm(i) == AbstractHyperLogLogPlusPlus.LINEAR_COUNTING) {
assertTrue(clone instanceof HyperLogLogPlusPlusSparse);
} else {
assertTrue(clone instanceof HyperLogLogPlusPlus);
}
checkEquivalence(single, i, clone, 0);
// test merge
final HyperLogLogPlusPlus merge = new HyperLogLogPlusPlus(p, BigArrays.NON_RECYCLING_INSTANCE, 0);
merge.merge(0, clone, 0);
checkEquivalence(merge, 0, clone, 0);
}
}
private void checkEquivalence(AbstractHyperLogLogPlusPlus first, int firstBucket,
AbstractHyperLogLogPlusPlus second, int secondBucket) {
assertEquals(first.hashCode(firstBucket), second.hashCode(secondBucket));
assertEquals(first.cardinality(firstBucket), second.cardinality(0));
assertTrue(first.equals(firstBucket, second, secondBucket));
assertTrue(second.equals(secondBucket, first, firstBucket));
}
}

View File

@ -90,7 +90,7 @@ public class InternalCardinalityTests extends InternalAggregationTestCase<Intern
@Override @Override
protected InternalCardinality mutateInstance(InternalCardinality instance) { protected InternalCardinality mutateInstance(InternalCardinality instance) {
String name = instance.getName(); String name = instance.getName();
HyperLogLogPlusPlus state = instance.getState(); AbstractHyperLogLogPlusPlus state = instance.getState();
Map<String, Object> metadata = instance.getMetadata(); Map<String, Object> metadata = instance.getMetadata();
switch (between(0, 2)) { switch (between(0, 2)) {
case 0: case 0:

View File

@ -16,6 +16,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
import org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLogPlusPlus;
import org.elasticsearch.search.aggregations.metrics.HyperLogLogPlusPlus; import org.elasticsearch.search.aggregations.metrics.HyperLogLogPlusPlus;
import org.elasticsearch.search.aggregations.metrics.InternalCardinality; import org.elasticsearch.search.aggregations.metrics.InternalCardinality;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
@ -68,7 +69,7 @@ public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator
try { try {
long cardinality = 0; long cardinality = 0;
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) { for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
HyperLogLogPlusPlus bucketHll = resolveBucketValue(histo, bucket, bucketsPaths()[0]); AbstractHyperLogLogPlusPlus bucketHll = resolveBucketValue(histo, bucket, bucketsPaths()[0]);
if (hll == null && bucketHll != null) { if (hll == null && bucketHll != null) {
// We have to create a new HLL because otherwise it will alter the // We have to create a new HLL because otherwise it will alter the
// existing cardinality sketch and bucket value // existing cardinality sketch and bucket value
@ -94,7 +95,7 @@ public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator
} }
} }
private HyperLogLogPlusPlus resolveBucketValue(MultiBucketsAggregation agg, private AbstractHyperLogLogPlusPlus resolveBucketValue(MultiBucketsAggregation agg,
InternalMultiBucketAggregation.InternalBucket bucket, InternalMultiBucketAggregation.InternalBucket bucket,
String aggPath) { String aggPath) {
List<String> aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList(); List<String> aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList();