diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHyperLogLog.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHyperLogLog.java index 7e38ec73166..d2c9a8b1b82 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHyperLogLog.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHyperLogLog.java @@ -784,17 +784,6 @@ public abstract class AbstractHyperLogLog extends AbstractCardinalityAlgorithm { addRunLen(bucketOrd, index, runLen); } - public void merge(long thisBucketOrd, AbstractHyperLogLog other, long otherBucketOrd) { - if (p != other.p) { - throw new IllegalArgumentException(); - } - RunLenIterator iterator = other.getRunLens(otherBucketOrd); - for (int i = 0; i < m; ++i) { - iterator.next(); - addRunLen(thisBucketOrd, i, iterator.value()); - } - } - static long index(long hash, int p) { return hash >>> (64 - p); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHyperLogLogPlusPlus.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHyperLogLogPlusPlus.java new file mode 100644 index 00000000000..c151ee288e9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHyperLogLogPlusPlus.java @@ -0,0 +1,154 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.BigArrays; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Base class for HLL++ algorithms. + * + * It contains methods for cloning and serializing the data structure. + */ +public abstract class AbstractHyperLogLogPlusPlus extends AbstractCardinalityAlgorithm implements Releasable { + + public static final boolean LINEAR_COUNTING = false; + public static final boolean HYPERLOGLOG = true; + + public AbstractHyperLogLogPlusPlus(int precision) { + super(precision); + } + + /** Algorithm used in the given bucket */ + protected abstract boolean getAlgorithm(long bucketOrd); + + /** Get linear counting algorithm */ + protected abstract AbstractLinearCounting.HashesIterator getLinearCounting(long bucketOrd); + + /** Get HyperLogLog algorithm */ + protected abstract AbstractHyperLogLog.RunLenIterator getHyperLogLog(long bucketOrd); + + /** Get the number of data structures */ + public abstract long maxOrd(); + + /** Collect a value in the given bucket */ + public abstract void collect(long bucketOrd, long hash); + + + /** Clone the data structure at the given bucket */ + public AbstractHyperLogLogPlusPlus clone(long bucketOrd, BigArrays bigArrays) { + if (getAlgorithm(bucketOrd) == LINEAR_COUNTING) { + // we use a sparse structure for linear counting + AbstractLinearCounting.HashesIterator iterator = getLinearCounting(bucketOrd); + int size = Math.toIntExact(iterator.size()); + HyperLogLogPlusPlusSparse clone = new HyperLogLogPlusPlusSparse(precision(), bigArrays, size, 1); + while (iterator.next()) { + clone.addEncoded(0, iterator.value()); + } + return clone; + } else { + HyperLogLogPlusPlus clone = new HyperLogLogPlusPlus(precision(), bigArrays, 1); + clone.merge(0, this, bucketOrd); + return clone; + } + } + + private Object getComparableData(long bucketOrd) { + if (getAlgorithm(bucketOrd) == LINEAR_COUNTING) { + Set values = new HashSet<>(); + AbstractLinearCounting.HashesIterator iteratorValues = getLinearCounting(bucketOrd); + while (iteratorValues.next()) { + values.add(iteratorValues.value()); + } + return values; + } else { + Map values = new HashMap<>(); + AbstractHyperLogLog.RunLenIterator iterator = getHyperLogLog(bucketOrd); + while (iterator.next()) { + byte runLength = iterator.value(); + Integer numOccurances = values.get(runLength); + if (numOccurances == null) { + values.put(runLength, 1); + } else { + values.put(runLength, numOccurances + 1); + } + } + return values; + } + } + + public void writeTo(long bucket, StreamOutput out) throws IOException { + out.writeVInt(precision()); + if (getAlgorithm(bucket) == LINEAR_COUNTING) { + out.writeBoolean(LINEAR_COUNTING); + AbstractLinearCounting.HashesIterator hashes = getLinearCounting(bucket); + out.writeVLong(hashes.size()); + while (hashes.next()) { + out.writeInt(hashes.value()); + } + } else { + out.writeBoolean(HYPERLOGLOG); + AbstractHyperLogLog.RunLenIterator iterator = getHyperLogLog(bucket); + while (iterator.next()){ + out.writeByte(iterator.value()); + } + } + } + + public static AbstractHyperLogLogPlusPlus readFrom(StreamInput in, BigArrays bigArrays) throws IOException { + final int precision = in.readVInt(); + final boolean algorithm = in.readBoolean(); + if (algorithm == LINEAR_COUNTING) { + // we use a sparse structure for linear counting + final long size = in.readVLong(); + HyperLogLogPlusPlusSparse counts = new HyperLogLogPlusPlusSparse(precision, bigArrays, Math.toIntExact(size), 1); + for (long i = 0; i < size; ++i) { + counts.addEncoded(0, in.readInt()); + } + return counts; + } else { + HyperLogLogPlusPlus counts = new HyperLogLogPlusPlus(precision, bigArrays, 1); + final int registers = 1 << precision; + for (int i = 0; i < registers; ++i) { + counts.addRunLen(0, i, in.readByte()); + } + return counts; + } + } + + public boolean equals(long thisBucket, AbstractHyperLogLogPlusPlus other, long otherBucket) { + return Objects.equals(precision(), other.precision()) + && Objects.equals(getAlgorithm(thisBucket), other.getAlgorithm(otherBucket)) + && Objects.equals(getComparableData(thisBucket), other.getComparableData(otherBucket)); + } + + public int hashCode(long bucket) { + return Objects.hash(precision(), getAlgorithm(bucket), getComparableData(bucket)); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractLinearCounting.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractLinearCounting.java index 4d79da939ac..87f56a8753d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractLinearCounting.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractLinearCounting.java @@ -35,7 +35,7 @@ public abstract class AbstractLinearCounting extends AbstractCardinalityAlgorith private static final int P2 = 25; public AbstractLinearCounting(int precision) { - super(precision); + super(precision); } /** @@ -59,6 +59,7 @@ public abstract class AbstractLinearCounting extends AbstractCardinalityAlgorith return addEncoded(bucketOrd, k); } + @Override public long cardinality(long bucketOrd) { final long m = 1 << P2; final long v = m - size(bucketOrd); @@ -92,7 +93,7 @@ public abstract class AbstractLinearCounting extends AbstractCardinalityAlgorith /** * number of elements in the iterator */ - long size(); + int size(); /** * Moves the iterator to the next element if it exists. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java index c617dafb840..6601dc0e0a0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java @@ -157,13 +157,12 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) { - if (counts == null || owningBucketOrdinal >= counts.maxBucket() || counts.cardinality(owningBucketOrdinal) == 0) { + if (counts == null || owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) { return buildEmptyAggregation(); } // We need to build a copy because the returned Aggregation needs remain usable after // this Aggregator (and its HLL++ counters) is released. - HyperLogLogPlusPlus copy = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); - copy.merge(0, counts, owningBucketOrdinal); + AbstractHyperLogLogPlusPlus copy = counts.clone(owningBucketOrdinal, BigArrays.NON_RECYCLING_INSTANCE); return new InternalCardinality(name, copy, metadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlus.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlus.java index d289a68ec81..80dd22371ec 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlus.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlus.java @@ -21,8 +21,6 @@ package org.elasticsearch.search.aggregations.metrics; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.packed.PackedInts; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; @@ -31,14 +29,8 @@ import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.ByteUtils; import org.elasticsearch.common.util.IntArray; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; -import java.util.Set; /** * Hyperloglog++ counter, implemented based on pseudo code from @@ -56,11 +48,10 @@ import java.util.Set; * * It supports storing several HyperLogLogPlusPlus structures which are identified by a bucket number. */ -public final class HyperLogLogPlusPlus implements Releasable { +public final class HyperLogLogPlusPlus extends AbstractHyperLogLogPlusPlus { private static final float MAX_LOAD_FACTOR = 0.75f; - private static final boolean LINEAR_COUNTING = false; - private static final boolean HYPERLOGLOG = true; + public static final int DEFAULT_PRECISION = 14; private final BitArray algorithm; @@ -85,35 +76,103 @@ public final class HyperLogLogPlusPlus implements Releasable { return 1L << precision; } - public HyperLogLogPlusPlus(int precision, BigArrays bigArrays, long initialBucketCount) { + super(precision); hll = new HyperLogLog(bigArrays, initialBucketCount, precision); lc = new LinearCounting(bigArrays, initialBucketCount, precision, hll); algorithm = new BitArray(1, bigArrays); } - public int precision() { - return hll.precision(); - } - - public long maxBucket() { + @Override + public long maxOrd() { return hll.runLens.size() >>> hll.precision(); } - public void merge(long thisBucket, HyperLogLogPlusPlus other, long otherBucket) { + @Override + public long cardinality(long bucketOrd) { + if (getAlgorithm(bucketOrd) == LINEAR_COUNTING) { + return lc.cardinality(bucketOrd); + } else { + return hll.cardinality(bucketOrd); + } + } + + @Override + protected boolean getAlgorithm(long bucketOrd) { + return algorithm.get(bucketOrd); + } + + @Override + protected AbstractLinearCounting.HashesIterator getLinearCounting(long bucketOrd) { + return lc.values(bucketOrd); + } + + @Override + protected AbstractHyperLogLog.RunLenIterator getHyperLogLog(long bucketOrd) { + return hll.getRunLens(bucketOrd); + } + + @Override + public void collect(long bucket, long hash) { + hll.ensureCapacity(bucket + 1); + if (algorithm.get(bucket) == LINEAR_COUNTING) { + final int newSize = lc.collect(bucket, hash); + if (newSize > lc.threshold) { + upgradeToHll(bucket); + } + } else { + hll.collect(bucket, hash); + } + } + + @Override + public void close() { + Releasables.close(algorithm, hll, lc); + } + + protected void addRunLen(long bucketOrd, int register, int runLen) { + if (algorithm.get(bucketOrd) == LINEAR_COUNTING) { + upgradeToHll(bucketOrd); + } + hll.addRunLen(0, register, runLen); + } + + void upgradeToHll(long bucketOrd) { + hll.ensureCapacity(bucketOrd + 1); + final AbstractLinearCounting.HashesIterator hashes = lc.values(bucketOrd); + // We need to copy values into an arrays as we will override + // the values on the buffer + final IntArray values = lc.bigArrays.newIntArray(hashes.size()); + try { + int i = 0; + while (hashes.next()) { + values.set(i++, hashes.value()); + } + assert i == hashes.size(); + hll.reset(bucketOrd); + for (long j = 0; j < values.size(); ++j) { + final int encoded = values.get(j); + hll.collectEncoded(bucketOrd, encoded); + } + algorithm.set(bucketOrd); + } finally { + Releasables.close(values); + } + } + + public void merge(long thisBucket, AbstractHyperLogLogPlusPlus other, long otherBucket) { if (precision() != other.precision()) { throw new IllegalArgumentException(); } hll.ensureCapacity(thisBucket + 1); - if (other.algorithm.get(otherBucket) == LINEAR_COUNTING) { - merge(thisBucket, other.lc, otherBucket); + if (other.getAlgorithm(otherBucket) == LINEAR_COUNTING) { + merge(thisBucket, other.getLinearCounting(otherBucket)); } else { - merge(thisBucket, other.hll, otherBucket); + merge(thisBucket, other.getHyperLogLog(otherBucket)); } } - private void merge(long thisBucket, AbstractLinearCounting other, long otherBucket) { - final AbstractLinearCounting.HashesIterator values = other.values(otherBucket); + private void merge(long thisBucket, AbstractLinearCounting.HashesIterator values) { while (values.next()) { final int encoded = values.value(); if (algorithm.get(thisBucket) == LINEAR_COUNTING) { @@ -127,124 +186,23 @@ public final class HyperLogLogPlusPlus implements Releasable { } } - private void merge(long thisBucket, AbstractHyperLogLog other, long otherBucket) { + private void merge(long thisBucket, AbstractHyperLogLog.RunLenIterator runLens) { if (algorithm.get(thisBucket) != HYPERLOGLOG) { upgradeToHll(thisBucket); } - hll.merge(thisBucket, other, otherBucket); - } - - public void collect(long bucket, long hash) { - hll.ensureCapacity(bucket + 1); - if (algorithm.get(bucket) == LINEAR_COUNTING) { - final int newSize = lc.collect(bucket, hash); - if (newSize > lc.threshold) { - upgradeToHll(bucket); - } - } else { - hll.collect(bucket, hash); + for (int i = 0; i < hll.m; ++i) { + runLens.next(); + hll.addRunLen(thisBucket, i, runLens.value()); } } - public long cardinality(long bucket) { - if (algorithm.get(bucket) == LINEAR_COUNTING) { - return lc.cardinality(bucket); - } else { - return hll.cardinality(bucket); - } - } - - void upgradeToHll(long bucket) { - hll.ensureCapacity(bucket + 1); - final AbstractLinearCounting.HashesIterator hashes = lc.values(bucket); - // We need to copy values into an arrays as we will override - // the values on the buffer - final IntArray values = lc.bigArrays.newIntArray(hashes.size()); - try { - int i = 0; - while (hashes.next()) { - values.set(i++, hashes.value()); - } - assert i == hashes.size(); - hll.reset(bucket); - for (long j = 0; j < values.size(); ++j) { - final int encoded = values.get(j); - hll.collectEncoded(bucket, encoded); - } - algorithm.set(bucket); - } finally { - Releasables.close(values); - } - } - - @Override - public void close() { - Releasables.close(algorithm, hll, lc); - } - - private Object getComparableData(long bucket) { - if (algorithm.get(bucket) == LINEAR_COUNTING) { - return lc.getComparableData(bucket); - } else { - return hll.getComparableData(bucket); - } - } - - public int hashCode(long bucket) { - return Objects.hash(precision(), algorithm.get(bucket), getComparableData(bucket)); - } - - public boolean equals(long bucket, HyperLogLogPlusPlus other) { - return Objects.equals(precision(), other.precision()) - && Objects.equals(algorithm.get(bucket), other.algorithm.get(bucket)) - && Objects.equals(getComparableData(bucket), other.getComparableData(bucket)); - } - - public void writeTo(long bucket, StreamOutput out) throws IOException { - out.writeVInt(precision()); - if (algorithm.get(bucket) == LINEAR_COUNTING) { - out.writeBoolean(LINEAR_COUNTING); - AbstractLinearCounting.HashesIterator hashes = lc.values(bucket); - out.writeVLong(hashes.size()); - while (hashes.next()) { - out.writeInt(hashes.value()); - } - } else { - out.writeBoolean(HYPERLOGLOG); - AbstractHyperLogLog.RunLenIterator iterator = hll.getRunLens(bucket); - while (iterator.next()){ - out.writeByte(iterator.value()); - } - } - } - - public static HyperLogLogPlusPlus readFrom(StreamInput in, BigArrays bigArrays) throws IOException { - final int precision = in.readVInt(); - HyperLogLogPlusPlus counts = new HyperLogLogPlusPlus(precision, bigArrays, 1); - final boolean algorithm = in.readBoolean(); - if (algorithm == LINEAR_COUNTING) { - counts.algorithm.clear(0); - final long size = in.readVLong(); - for (long i = 0; i < size; ++i) { - final int encoded = in.readInt(); - counts.lc.addEncoded(0, encoded); - } - } else { - counts.algorithm.set(0); - for (int i = 0; i < counts.hll.m; ++i) { - counts.hll.addRunLen(0, i, in.readByte()); - } - } - return counts; - } - private static class HyperLogLog extends AbstractHyperLogLog implements Releasable { - private final BigArrays bigArrays; private final HyperLogLogIterator iterator; // array for holding the runlens. private ByteArray runLens; + HyperLogLog(BigArrays bigArrays, long initialBucketCount, int precision) { super(precision); this.runLens = bigArrays.newByteArray(initialBucketCount << precision); @@ -268,20 +226,6 @@ public final class HyperLogLogPlusPlus implements Releasable { runLens.fill(bucketOrd << p, (bucketOrd << p) + m, (byte) 0); } - protected Object getComparableData(long bucketOrd) { - Map values = new HashMap<>(); - for (long i = 0; i < runLens.size(); i++) { - byte runLength = runLens.get((bucketOrd << p) + i); - Integer numOccurances = values.get(runLength); - if (numOccurances == null) { - values.put(runLength, 1); - } else { - values.put(runLength, numOccurances + 1); - } - } - return values; - } - protected void ensureCapacity(long numBuckets) { runLens = bigArrays.grow(runLens, numBuckets << p); } @@ -296,8 +240,8 @@ public final class HyperLogLogPlusPlus implements Releasable { private final HyperLogLog hll; private final int m, p; - private int pos; - private long start; + int pos; + long start; private byte value; HyperLogLogIterator(HyperLogLog hll, int p, int m) { @@ -386,15 +330,6 @@ public final class HyperLogLogPlusPlus implements Releasable { return iterator; } - protected Object getComparableData(long bucketOrd) { - Set values = new HashSet<>(); - HashesIterator iteratorValues = values(bucketOrd); - while (iteratorValues.next()) { - values.add(iteratorValues.value()); - } - return values; - } - private long index(long bucketOrd, int index) { return (bucketOrd << p) + (index << 2); } @@ -430,8 +365,8 @@ public final class HyperLogLogPlusPlus implements Releasable { private final LinearCounting lc; private final int capacity; - private int pos; - private long size, bucketOrd; + private int pos, size; + private long bucketOrd; private int value; LinearCountingIterator(LinearCounting lc, int capacity) { @@ -439,14 +374,14 @@ public final class HyperLogLogPlusPlus implements Releasable { this.capacity = capacity; } - void reset(long bucketOrd, long size) { + void reset(long bucketOrd, int size) { this.bucketOrd = bucketOrd; this.size = size; this.pos = size == 0 ? capacity : 0; } @Override - public long size() { + public int size() { return size; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparse.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparse.java new file mode 100644 index 00000000000..6db20006ab7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparse.java @@ -0,0 +1,205 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.IntArray; + +/** + * AbstractHyperLogLogPlusPlus instance that only supports linear counting. The maximum number of hashes supported + * by the structure is determined at construction time. + * + * This structure expects all the added values to be distinct and therefore there are no checks + * if an element has been previously added. + */ +final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implements Releasable { + + private final LinearCounting lc; + + /** + * Create an sparse HLL++ algorithm where capacity is the maximum number of hashes this structure can hold + * per bucket. + */ + HyperLogLogPlusPlusSparse(int precision, BigArrays bigArrays, int capacity, int initialSize) { + super(precision); + this.lc = new LinearCounting(precision, bigArrays, capacity, initialSize); + } + + @Override + public long maxOrd() { + return lc.sizes.size(); + } + + @Override + public long cardinality(long bucketOrd) { + return lc.cardinality(bucketOrd); + } + + @Override + protected boolean getAlgorithm(long bucketOrd) { + return LINEAR_COUNTING; + } + + @Override + protected AbstractLinearCounting.HashesIterator getLinearCounting(long bucketOrd) { + return lc.values(bucketOrd); + } + + @Override + protected AbstractHyperLogLog.RunLenIterator getHyperLogLog(long bucketOrd) { + throw new IllegalArgumentException("Implementation does not support HLL structures"); + } + + @Override + public void collect(long bucket, long hash) { + lc.collect(bucket, hash); + } + + @Override + public void close() { + Releasables.close(lc); + } + + protected void addEncoded(long bucket, int encoded) { + lc.addEncoded(bucket, encoded); + } + + private static class LinearCounting extends AbstractLinearCounting implements Releasable { + + private final int capacity; + private final BigArrays bigArrays; + private final LinearCountingIterator iterator; + // We are actually using HyperLogLog's runLens array but interpreting it as a hash set for linear counting. + // Number of elements stored. + private IntArray values; + private IntArray sizes; + + LinearCounting(int p, BigArrays bigArrays, int capacity, int initialSize) { + super(p); + this.bigArrays = bigArrays; + this.capacity = capacity; + values = bigArrays.newIntArray(initialSize * capacity); + sizes = bigArrays.newIntArray(initialSize * capacity); + iterator = new LinearCountingIterator(this, capacity); + } + + @Override + protected int addEncoded(long bucketOrd, int encoded) { + assert encoded != 0; + return set(bucketOrd, encoded); + } + + @Override + protected int size(long bucketOrd) { + if (bucketOrd >= sizes.size()) { + return 0; + } + final int size = sizes.get(bucketOrd); + assert size == recomputedSize(bucketOrd); + return size; + } + + @Override + protected HashesIterator values(long bucketOrd) { + iterator.reset(bucketOrd, size(bucketOrd)); + return iterator; + } + + private long index(long bucketOrd, int index) { + return (bucketOrd * capacity) + index; + } + + private int get(long bucketOrd, int index) { + long globalIndex = index(bucketOrd, index); + if (values.size() < globalIndex) { + return 0; + } + return values.get(globalIndex); + } + + private int set(long bucketOrd, int value) { + int size = size(bucketOrd); + if (size == 0) { + sizes = bigArrays.grow(sizes, bucketOrd + 1); + values = bigArrays.grow(values, (bucketOrd * capacity) + capacity); + } + values.set(index(bucketOrd, size), value); + return sizes.increment(bucketOrd, 1); + } + + private int recomputedSize(long bucketOrd) { + for (int i = 0; i < capacity; ++i) { + final int v = get(bucketOrd, i); + if (v == 0) { + return i; + } + } + return capacity; + } + + @Override + public void close() { + Releasables.close(values, sizes); + } + } + + private static class LinearCountingIterator implements AbstractLinearCounting.HashesIterator { + + private final LinearCounting lc; + private final int capacity; + long start; + long end; + private int value, size; + private long pos; + + LinearCountingIterator(LinearCounting lc, int capacity) { + this.lc = lc; + this.capacity = capacity; + } + + void reset(long bucketOrd, int size) { + this.start = bucketOrd * capacity; + this.size = size; + this.end = start + size; + this.pos = start; + } + + @Override + public int size() { + return size; + } + + @Override + public boolean next() { + if (pos < end) { + value = lc.values.get(pos++); + return true; + } + return false; + } + + @Override + public int value() { + return value; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java index c959d000b27..53993905621 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java @@ -32,9 +32,9 @@ import java.util.Map; import java.util.Objects; public final class InternalCardinality extends InternalNumericMetricsAggregation.SingleValue implements Cardinality { - private final HyperLogLogPlusPlus counts; + private final AbstractHyperLogLogPlusPlus counts; - InternalCardinality(String name, HyperLogLogPlusPlus counts, Map metadata) { + InternalCardinality(String name, AbstractHyperLogLogPlusPlus counts, Map metadata) { super(name, metadata); this.counts = counts; } @@ -46,7 +46,7 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation super(in); format = in.readNamedWriteable(DocValueFormat.class); if (in.readBoolean()) { - counts = HyperLogLogPlusPlus.readFrom(in, BigArrays.NON_RECYCLING_INSTANCE); + counts = AbstractHyperLogLogPlusPlus.readFrom(in, BigArrays.NON_RECYCLING_INSTANCE); } else { counts = null; } @@ -78,34 +78,30 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation return counts == null ? 0 : counts.cardinality(0); } - public HyperLogLogPlusPlus getCounts() { + public AbstractHyperLogLogPlusPlus getCounts() { return counts; } @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - InternalCardinality reduced = null; + HyperLogLogPlusPlus reduced = null; for (InternalAggregation aggregation : aggregations) { final InternalCardinality cardinality = (InternalCardinality) aggregation; if (cardinality.counts != null) { if (reduced == null) { - reduced = new InternalCardinality(name, new HyperLogLogPlusPlus(cardinality.counts.precision(), - BigArrays.NON_RECYCLING_INSTANCE, 1), getMetadata()); + reduced = new HyperLogLogPlusPlus(cardinality.counts.precision(), + BigArrays.NON_RECYCLING_INSTANCE, 1); } - reduced.merge(cardinality); + reduced.merge(0, cardinality.counts, 0); } } if (reduced == null) { // all empty return aggregations.get(0); } else { - return reduced; - } - } + return new InternalCardinality(name, reduced, getMetadata()); - public void merge(InternalCardinality other) { - assert counts != null && other != null; - counts.merge(0, other.counts, 0); + } } @Override @@ -127,10 +123,10 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation if (super.equals(obj) == false) return false; InternalCardinality other = (InternalCardinality) obj; - return counts.equals(0, other.counts); + return counts.equals(0, other.counts, 0); } - HyperLogLogPlusPlus getState() { + AbstractHyperLogLogPlusPlus getState() { return counts; } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparseTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparseTests.java new file mode 100644 index 00000000000..2e7f1fb24ba --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparseTests.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics; + +import com.carrotsearch.hppc.BitMixer; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MAX_PRECISION; +import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MIN_PRECISION; + +public class HyperLogLogPlusPlusSparseTests extends ESTestCase { + + public void testEquivalence() throws IOException { + final int p = randomIntBetween(MIN_PRECISION, MAX_PRECISION); + final HyperLogLogPlusPlus single = new HyperLogLogPlusPlus(p, BigArrays.NON_RECYCLING_INSTANCE, 0); + final int numBuckets = randomIntBetween(2, 100); + final int numValues = randomIntBetween(1, 100000); + final int maxValue = randomIntBetween(1, randomBoolean() ? 1000: 1000000); + for (int i = 0; i < numValues; ++i) { + final int n = randomInt(maxValue); + final long hash = BitMixer.mix64(n); + single.collect(randomInt(numBuckets), hash); + } + for (int i = 0; i < numBuckets; i++) { + // test clone + AbstractHyperLogLogPlusPlus clone = single.clone(i, BigArrays.NON_RECYCLING_INSTANCE); + if (single.getAlgorithm(i) == AbstractHyperLogLogPlusPlus.LINEAR_COUNTING) { + assertTrue(clone instanceof HyperLogLogPlusPlusSparse); + } else { + assertTrue(clone instanceof HyperLogLogPlusPlus); + } + checkEquivalence(single, i, clone, 0); + // test serialize + BytesStreamOutput out = new BytesStreamOutput(); + single.writeTo(i, out); + clone = AbstractHyperLogLogPlusPlus.readFrom(out.bytes().streamInput(), BigArrays.NON_RECYCLING_INSTANCE); + if (single.getAlgorithm(i) == AbstractHyperLogLogPlusPlus.LINEAR_COUNTING) { + assertTrue(clone instanceof HyperLogLogPlusPlusSparse); + } else { + assertTrue(clone instanceof HyperLogLogPlusPlus); + } + checkEquivalence(single, i, clone, 0); + // test merge + final HyperLogLogPlusPlus merge = new HyperLogLogPlusPlus(p, BigArrays.NON_RECYCLING_INSTANCE, 0); + merge.merge(0, clone, 0); + checkEquivalence(merge, 0, clone, 0); + } + } + + private void checkEquivalence(AbstractHyperLogLogPlusPlus first, int firstBucket, + AbstractHyperLogLogPlusPlus second, int secondBucket) { + assertEquals(first.hashCode(firstBucket), second.hashCode(secondBucket)); + assertEquals(first.cardinality(firstBucket), second.cardinality(0)); + assertTrue(first.equals(firstBucket, second, secondBucket)); + assertTrue(second.equals(secondBucket, first, firstBucket)); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalCardinalityTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalCardinalityTests.java index 23ab039fa96..35b6c3e8fa6 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalCardinalityTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalCardinalityTests.java @@ -90,7 +90,7 @@ public class InternalCardinalityTests extends InternalAggregationTestCase metadata = instance.getMetadata(); switch (between(0, 2)) { case 0: diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java index 7e18f74658c..806389a2303 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java @@ -16,6 +16,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; +import org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLogPlusPlus; import org.elasticsearch.search.aggregations.metrics.HyperLogLogPlusPlus; import org.elasticsearch.search.aggregations.metrics.InternalCardinality; import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; @@ -68,7 +69,7 @@ public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator try { long cardinality = 0; for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) { - HyperLogLogPlusPlus bucketHll = resolveBucketValue(histo, bucket, bucketsPaths()[0]); + AbstractHyperLogLogPlusPlus bucketHll = resolveBucketValue(histo, bucket, bucketsPaths()[0]); if (hll == null && bucketHll != null) { // We have to create a new HLL because otherwise it will alter the // existing cardinality sketch and bucket value @@ -94,9 +95,9 @@ public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator } } - private HyperLogLogPlusPlus resolveBucketValue(MultiBucketsAggregation agg, - InternalMultiBucketAggregation.InternalBucket bucket, - String aggPath) { + private AbstractHyperLogLogPlusPlus resolveBucketValue(MultiBucketsAggregation agg, + InternalMultiBucketAggregation.InternalBucket bucket, + String aggPath) { List aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList(); Object propertyValue = bucket.getProperty(agg.getName(), aggPathsList); if (propertyValue == null) {