diff --git a/docs/content/Aggregations.md b/docs/content/Aggregations.md index 1c28e1dc10f..cdf33ed2b9b 100644 --- a/docs/content/Aggregations.md +++ b/docs/content/Aggregations.md @@ -155,8 +155,33 @@ Determine the number of distinct are assigned to. ### HyperUnique aggregator -Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to compute the estimated cardinality of a dimension that has been aggregated as a hyperUnique metric at indexing time. +Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to compute the estimated cardinality of a dimension that has been aggregated as a "hyperUnique" metric at indexing time. ```json { "type" : "hyperUnique", "name" : , "fieldName" : } ``` + +### ApproxHistogram aggregator + +This aggregator is based on [http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf](http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf) to compute approximate histograms. + +To use this feature, an "approxHistogram" aggregator must be included at indexing time. The ingestion aggregator can only apply to numeric values. To query for results, an "approxHistogramFold" aggregator must be included in the query. + +```json +{ + "type" : "approxHistogram(ingestion), approxHistogramFold(query)", + "name" : , + "fieldName" : , + "resolution" : , + "numBuckets" : , + "lowerLimit" : , + "upperLimit" : +} +``` + +|Property|Description|Default| +|--------|-----------|-------| +|`resolution`|Number of centroids (data points) to store. The higher the resolution, the more accurate results are, but the slower computation will be.|50| +|`numBuckets`|Number of output buckets for the resulting histogram.|7| +|`lowerLimit`/`upperLimit`|Restrict the approximation to the given range. The values outside this range will be aggregated into two centroids. Counts of values outside this range are still maintained. |-INF/+INF| + diff --git a/docs/content/Post-aggregations.md b/docs/content/Post-aggregations.md index 4dce46ceff1..e4ce2211f67 100644 --- a/docs/content/Post-aggregations.md +++ b/docs/content/Post-aggregations.md @@ -64,7 +64,7 @@ Example JavaScript aggregator: "function": "function(delta, total) { return 100 * Math.abs(delta) / total; }" } ``` -### `hyperUniqueCardinality` post-aggregator +### HyperUnique Cardinality post-aggregator The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations. @@ -90,8 +90,7 @@ It can be used in a sample calculation as so: } ``` - -### Example Usage +#### Example Usage In this example, let’s calculate a simple percentage using post aggregators. Let’s imagine our data set has a metric called "total". @@ -122,5 +121,70 @@ The format of the query JSON is as follows: } ... } +``` -``` \ No newline at end of file +### Approximate Histogram post-aggregators + +Post-aggregators used to transform opaque approximate histogram objects +into actual histogram representations, and to compute various distribution metrics. + +#### equal buckets post-aggregator + +Computes a visual representation of the approximate histogram with a given number of equal-sized bins + +```json +{ "type" : "equalBuckets", "name" : , "fieldName" : , + "numBuckets" : } +``` + +#### buckets post-aggregator + +Computes a visual representation given an initial breakpoint, offset, and a bucket size. + +```json +{ "type" : "buckets", "name" : , "fieldName" : , + "bucketSize" : , "offset" : } +``` + +#### custom buckets post-aggregator + +Computes a visual representation of the approximate histogram with bins laid out according to the given breaks + +```json +{ "type" : "customBuckets", "name" : , "fieldName" : , + "breaks" : [ , , ... ] } +``` + +#### min post-aggregator + +Returns the minimum value of the underlying approximate histogram aggregator + +```json +{ "type" : "min", "name" : , "fieldName" : } +``` + +#### max post-aggregator + +Returns the maximum value of the underlying approximate histogram aggregator + +```json +{ "type" : "max", "name" : , "fieldName" : } +``` + +#### quantile post-aggregator + +Computes a single quantile based on the underlying approximate histogram aggregator + +```json +{ "type" : "quantile", "name" : , "fieldName" : , + "probability" : } +``` + +#### quantiles post-aggregator + +Computes an array of quantiles based on the underlying approximate histogram aggregator + +```json +{ "type" : "quantiles", "name" : , "fieldName" : , + "probabilities" : [ , , ... ] } +``` diff --git a/histogram/pom.xml b/histogram/pom.xml new file mode 100644 index 00000000000..f3d9c31f617 --- /dev/null +++ b/histogram/pom.xml @@ -0,0 +1,71 @@ + + + + 4.0.0 + io.druid.extensions + druid-histogram + druid-histogram + druid-histogram + + + io.druid + druid + 0.6.129-SNAPSHOT + + + + + io.druid + druid-processing + ${project.parent.version} + + + + + io.druid + druid-processing + ${project.parent.version} + test + test-jar + + + junit + junit + test + + + + + + + maven-jar-plugin + + + + true + true + + + + + + + diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java new file mode 100644 index 00000000000..fc06968f870 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java @@ -0,0 +1,1680 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ApproximateHistogram +{ + public static final int DEFAULT_HISTOGRAM_SIZE = 50; + public static final int DEFAULT_BUCKET_SIZE = 7; + + // max size of the histogram (number of bincount/position pairs) + int size; + + public float[] positions; + public long[] bins; + + // used bincount + int binCount; + // min value that's been put into histogram + float min; + float max; + // total number of values that have been put into histogram + transient long count; + + // lower limit to maintain resolution + // cutoff above which we merge bins is the difference of the limits / (size - 3) + // so we'll set size = 203, lower limit = 0, upper limit = 10.00 if we don't want + // to merge differences < 0.05 + transient float lowerLimit; + transient float upperLimit; + + // use sign bit to indicate approximate bin and remaining bits for bin count + private static final long APPROX_FLAG_BIT = Long.MIN_VALUE; + private static final long COUNT_BITS = Long.MAX_VALUE; + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ApproximateHistogram that = (ApproximateHistogram) o; + + if (size != that.size) { + return false; + } + if (binCount != that.binCount) { + return false; + } + if (Float.compare(that.max, max) != 0) { + return false; + } + if (Float.compare(that.min, min) != 0) { + return false; + } + for (int i = 0; i < binCount; ++i) { + if (positions[i] != that.positions[i]) { + return false; + } + } + for (int i = 0; i < binCount; ++i) { + if (bins[i] != that.bins[i]) { + return false; + } + } + return true; + } + + @Override + public int hashCode() + { + int result = size; + result = 31 * result + (positions != null ? ArrayUtils.hashCode(positions, 0, binCount) : 0); + result = 31 * result + (bins != null ? ArrayUtils.hashCode(bins, 0, binCount) : 0); + result = 31 * result + binCount; + result = 31 * result + (min != +0.0f ? Float.floatToIntBits(min) : 0); + result = 31 * result + (max != +0.0f ? Float.floatToIntBits(max) : 0); + return result; + } + + + public ApproximateHistogram( + int size, + float[] positions, + long[] bins, + int binCount, + float min, + float max, + long count, + float lowerLimit, + float upperLimit + ) + { + Preconditions.checkArgument(positions.length == bins.length, "position and bin array must have same size"); + Preconditions.checkArgument(binCount <= size, "binCount must be less or equal to size"); + + this.size = size; + this.positions = positions; + this.bins = bins; + this.binCount = binCount; + this.min = min; + this.max = max; + this.count = count; + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + } + + public ApproximateHistogram() + { + this(DEFAULT_HISTOGRAM_SIZE); + } + + public ApproximateHistogram(int size) + { + this( + size, //size + new float[size], //positions + new long[size], //bins + 0, //binCount + Float.POSITIVE_INFINITY, //min + Float.NEGATIVE_INFINITY, //max + 0, //count + Float.NEGATIVE_INFINITY, //lowerLimit + Float.POSITIVE_INFINITY //upperLimit + ); + } + + public ApproximateHistogram(int size, float lowerLimit, float upperLimit) + { + this( + size, //size + new float[size], //positions + new long[size], //bins + 0, //binCount + Float.POSITIVE_INFINITY, //min + Float.NEGATIVE_INFINITY, //max + 0, //count + lowerLimit, //lowerLimit + upperLimit //upperLimit + ); + } + + public ApproximateHistogram(int binCount, float[] positions, long[] bins, float min, float max) + { + this( + positions.length, //size + positions, //positions + bins, //bins + binCount, //binCount + min, //min + max, //max + sumBins(bins, binCount), //count + Float.NEGATIVE_INFINITY, //lowerLimit + Float.POSITIVE_INFINITY //upperLimit + ); + } + + public long count() { return count; } + + public float min() { return min; } + + public float max() { return max; } + + public int binCount() { return binCount; } + + public int capacity() { return size; } + + public float[] positions() { return Arrays.copyOfRange(positions, 0, binCount); } + + public long[] bins() + { + long[] counts = new long[binCount]; + for (int i = 0; i < binCount; ++i) { + counts[i] = bins[i] & COUNT_BITS; + } + return counts; + } + + @Override + public String toString() + { + return "ApproximateHistogram{" + + "size=" + size + + ", lowerLimit=" + lowerLimit + + ", upperLimit=" + upperLimit + + ", positions=" + Arrays.toString(positions()) + + ", bins=" + getBinsString() + + ", binCount=" + binCount + + ", min=" + min + + ", max=" + max + + ", count=" + count + + '}'; + } + + public long getExactCount() + { + long exactCount = 0; + for (int i = 0; i < binCount; ++i) { + if ((bins[i] & APPROX_FLAG_BIT) == 0) { + exactCount += (bins[i] & COUNT_BITS); + } + } + return exactCount; + } + + public float getMin() { return this.min;} + + public float getMax() { return this.max;} + + private static long sumBins(long[] bins, int binCount) + { + long count = 0; + for (int i = 0; i < binCount; ++i) { + count += bins[i] & COUNT_BITS; + } + return count; + } + + /** + * Returns a string representation of the actual bin counts + * + * @return + */ + protected String getBinsString() + { + StringBuilder s = new StringBuilder(); + s.append('['); + for (int i = 0; i < bins.length; ++i) { + if (i > 0) { + s.append(", "); + } + if ((bins[i] & APPROX_FLAG_BIT) != 0) { + s.append("*"); + } + s.append(bins[i] & COUNT_BITS); + } + s.append(']'); + return s.toString(); + } + + public void setLowerLimit(float lowerLimit) + { + this.lowerLimit = lowerLimit; + } + + public void setUpperLimit(float upperLimit) + { + this.upperLimit = upperLimit; + } + + /** + * Adds the given value to the histogram + * + * @param value the value to be added + */ + public void offer(float value) + { + // update min/max + if (value < min) { + min = value; + } + if (value > max) { + max = value; + } + + // initial value + if (binCount == 0) { + positions[0] = value; + bins[0] = 1; + count++; + + binCount++; + return; + } + + final int index = Arrays.binarySearch(positions, 0, binCount, value); + + if (index >= 0) { + // we have an exact match, simply increase the count, but keep the approximate flag + bins[index] = (bins[index] & APPROX_FLAG_BIT) | ((bins[index] & COUNT_BITS) + 1); + count++; + return; + } + + // otherwise merge the value into a new or existing bin at the following position + final int insertAt = -(index + 1); + + if (binCount < size) { + // we have a spare slot, put the value into a new bin + shiftRight(insertAt, binCount); + + positions[insertAt] = value; + bins[insertAt] = 1; + count++; + + binCount++; + return; + } + + // no more slots available merge the new value into and existing bin + // or merge existing bins before inserting the new one + + int minPos = minDeltaIndex(); + float minDelta = minPos >= 0 ? positions[minPos + 1] - positions[minPos] : Float.MAX_VALUE; + + // determine the distance of new value to the nearest bins + final float deltaRight = insertAt < binCount ? positions[insertAt] - value : Float.MAX_VALUE; + final float deltaLeft = insertAt > 0 ? value - positions[insertAt - 1] : Float.MAX_VALUE; + + boolean mergeValue = false; + if (deltaRight < minDelta) { + minDelta = deltaRight; + minPos = insertAt; + mergeValue = true; + } + if (deltaLeft < minDelta) { + minDelta = deltaLeft; + minPos = insertAt - 1; + mergeValue = true; + } + + if (mergeValue) { + // merge new value into an existing bin and set approximate flag + final long k = bins[minPos] & COUNT_BITS; + positions[minPos] = (positions[minPos] * k + value) / (k + 1); + bins[minPos] = (k + 1) | APPROX_FLAG_BIT; + count++; + } else { + // merge the closest bins together and insert new value as a separate bin + mergeInsert(minPos, insertAt, value, 1); + } + } + + protected int minDeltaIndex() + { + // determine minimum distance between existing bins + float minDelta = Float.MAX_VALUE; + int minPos = -1; + for (int i = 0; i < binCount - 1; ++i) { + float delta = (positions[i + 1] - positions[i]); + if (delta < minDelta) { + minDelta = delta; + minPos = i; + } + } + return minPos; + } + + /** + * Merges the bin in the given position with the next bin + * + * @param index index of the bin to merge, index must satisfy 0 <= index < binCount - 1 + */ + protected void merge(final int index) + { + mergeInsert(index, -1, 0, 0); + } + + /** + * Merges the bin in the mergeAt position with the bin in position mergeAt+1 + * and simultaneously inserts the given bin (v,c) as a new bin at position insertAt + * + * @param mergeAt index of the bin to be merged + * @param insertAt index to insert the new bin at + * @param v bin position + * @param c bin count + */ + protected void mergeInsert(final int mergeAt, int insertAt, final float v, final long c) + { + final long k0 = (bins[mergeAt] & COUNT_BITS); + final long k1 = (bins[mergeAt + 1] & COUNT_BITS); + final long sum = k0 + k1; + + // merge bin at given position with the next bin and set approximate flag + positions[mergeAt] = (float) (((double) positions[mergeAt] * k0 + (double) positions[mergeAt + 1] * k1) / sum); + bins[mergeAt] = sum | APPROX_FLAG_BIT; + + final int unusedIndex = mergeAt + 1; + + if (insertAt >= 0) { + // use unused slot to shift array left or right and make space for the new bin to insert + if (insertAt < unusedIndex) { + shiftRight(insertAt, unusedIndex); + } else if (insertAt >= unusedIndex) { + shiftLeft(unusedIndex, insertAt - 1); + insertAt--; + } + positions[insertAt] = v; + bins[insertAt] = c; + count++; + } else { + // simple merging of bins, shift everything left and free up the unused bin + shiftLeft(unusedIndex, binCount - 1); + binCount--; + } + } + + /** + * Shifts the given range the histogram bins one slot to the right + * + * @param start index of the first bin to shift + * @param end index of the rightmost bin to shift into + */ + protected void shiftRight(int start, int end) + { + float prevVal = positions[start]; + long prevCnt = bins[start]; + + for (int i = start + 1; i <= end; ++i) { + float tmpVal = positions[i]; + long tmpCnt = bins[i]; + + positions[i] = prevVal; + bins[i] = prevCnt; + + prevVal = tmpVal; + prevCnt = tmpCnt; + } + } + + /** + * Shifts the given range of histogram bins one slot to the left + * + * @param start index of the leftmost empty bin to shift into + * @param end index of the last bin to shift left + */ + protected void shiftLeft(int start, int end) + { + for (int i = start; i < end; ++i) { + positions[i] = positions[i + 1]; + bins[i] = bins[i + 1]; + } + } + + public ApproximateHistogram fold(ApproximateHistogram h) + { + return fold(h, null, null, null); + } + + public ApproximateHistogram fold(ApproximateHistogram h, float[] mergedPositions, long[] mergedBins, float[] deltas) + { + if (size == 0) { + return copy(h); + } else { + return foldMin(h, mergedPositions, mergedBins, deltas); + } + } + + public ApproximateHistogram foldFast(ApproximateHistogram h) + { + return foldFast(h, null, null); + } + + /** + * @param h histogram to be merged into the current histogram + * @param mergedPositions temporary buffer of size greater or equal to this.capacity() + * @param mergedBins temporary buffer of size greater or equal to this.capacity() + * + * @return returns this histogram with h folded into it + */ + public ApproximateHistogram foldFast(ApproximateHistogram h, float[] mergedPositions, long[] mergedBins) + { + if (size == 0) { + return copy(h); + } else { + return foldRule(h, mergedPositions, mergedBins); + } + } + + /** + * Copies histogram h into the current histogram. + * + * @param h + * + * @return + */ + public ApproximateHistogram copy(ApproximateHistogram h) + { + this.size = h.size; + this.positions = new float[size]; + this.bins = new long[size]; + + System.arraycopy(h.positions, 0, this.positions, 0, h.binCount); + System.arraycopy(h.bins, 0, this.bins, 0, h.binCount); + this.min = h.min; + this.max = h.max; + this.binCount = h.binCount; + this.count = h.count; + return this; + } + + //approximate histogram solution using min heap to store location of min deltas + protected ApproximateHistogram foldMin( + ApproximateHistogram h, + float[] mergedPositions, + long[] mergedBins, + float[] deltas + ) + { + // find common min / max + float mergedMin = this.min < h.min ? this.min : h.min; + float mergedMax = this.max > h.max ? this.max : h.max; + long mergedCount = this.count + h.count; + + int maxSize = this.binCount + h.binCount; + int[] next = new int[maxSize]; + int[] prev = new int[maxSize]; + + // use preallocated arrays if passed + if (mergedPositions == null || mergedBins == null || deltas == null) { + mergedPositions = new float[maxSize]; + mergedBins = new long[maxSize]; + deltas = new float[maxSize]; + } else { + Preconditions.checkArgument( + mergedPositions.length >= maxSize, + "temp buffer [mergedPositions] too small: length must be at least [%d], got [%d]", + maxSize, + mergedPositions.length + ); + Preconditions.checkArgument( + mergedBins.length >= maxSize, + "temp buffer [mergedBins] too small: length must be at least [%d], got [%d]", + maxSize, + mergedPositions.length + ); + Preconditions.checkArgument( + deltas.length >= maxSize, + "temp buffer [deltas] too small: length must be at least [%d], got [%d]", + maxSize, + mergedPositions.length + ); + } + + int mergedBinCount = combineBins( + this.binCount, this.positions, this.bins, h.binCount, h.positions, h.bins, + mergedPositions, mergedBins, deltas + ); + if (mergedBinCount == 0) { + return this; + } + + // determine how many bins to merge + int numMerge = mergedBinCount - this.size; + if (numMerge < 0) { + numMerge = 0; + } + + // perform the required number of merges + mergeBins(mergedBinCount, mergedPositions, mergedBins, deltas, numMerge, next, prev); + + // copy merged values + int i = 0; + int k = 0; + while (i < mergedBinCount) { + this.positions[k] = mergedPositions[i]; + this.bins[k] = mergedBins[i]; + ++k; + i = next[i]; + } + this.binCount = mergedBinCount - numMerge; + this.min = mergedMin; + this.max = mergedMax; + this.count = mergedCount; + return this; + } + + protected ApproximateHistogram foldRule(ApproximateHistogram h, float[] mergedPositions, long[] mergedBins) + { + // ruleCombine bins requires at least one bin + if (h.binCount == 0) { + return this; + } + + // find common min / max + float mergedMin = this.min < h.min ? this.min : h.min; + float mergedMax = this.max > h.max ? this.max : h.max; + long mergedCount = this.count + h.count; + this.min = mergedMin; + this.max = mergedMax; + + // use preallocated arrays if passed + if (mergedPositions == null) { + mergedPositions = new float[this.size]; + mergedBins = new long[this.size]; + } + + int mergedBinCount; + if (this.binCount + h.binCount <= this.size) { + // no need to merge bins + mergedBinCount = combineBins( + this.binCount, this.positions, this.bins, + h.binCount, h.positions, h.bins, + mergedPositions, mergedBins, null + ); + } else { + mergedBinCount = ruleCombineBins( + this.binCount, this.positions, this.bins, h.binCount, h.positions, h.bins, + mergedPositions, mergedBins + ); + } + for (int i = 0; i < mergedBinCount; ++i) { + this.positions[i] = mergedPositions[i]; + this.bins[i] = mergedBins[i]; + } + + this.binCount = mergedBinCount; + this.count = mergedCount; + + return this; + } + + protected int ruleCombineBins( + int leftBinCount, float[] leftPositions, long[] leftBins, + int rightBinCount, float[] rightPositions, long[] rightBins, + float[] mergedPositions, long[] mergedBins + ) + { + final float cutoff; + // assumes binCount is greater than one for both histograms + // if upper and lower limits are set, we use the first and last used values of the arrays + // for information below and above the limits, respectively + if (this.upperLimit != Float.POSITIVE_INFINITY && this.lowerLimit != Float.NEGATIVE_INFINITY) { + cutoff = (this.upperLimit - this.lowerLimit) / (size - 2 - 1); + } else { + if (this.upperLimit != Float.POSITIVE_INFINITY) { + cutoff = (this.upperLimit - this.min) / (size - 2); + } else if (this.lowerLimit != Float.NEGATIVE_INFINITY) { + cutoff = (this.max - this.lowerLimit) / (size - 2); + } else { + cutoff = (this.max - this.min) / (size - 1); + } + } + + float lowerPosition = 0f; + long lowerBin = 0; + float upperPosition = 0f; + long upperBin = 0; + + int j = 0; + int k = 0; + int pos = 0; + + // continuously merge the left histogram below the lower limit + while (j != leftBinCount) { + final float m1 = leftPositions[j]; + if (m1 < lowerLimit) { + final long k1 = leftBins[j] & COUNT_BITS; + float delta = (m1 - lowerPosition); + final long k0 = lowerBin & COUNT_BITS; + final long sum = k0 + k1; + final float w = (float) k0 / (float) sum; + lowerPosition = -delta * w + m1; + // set approximate flag + lowerBin = sum | APPROX_FLAG_BIT; + ++j; + } else { + break; + } + } + + // continuously merge the right histogram below the lower limit + while (k != rightBinCount) { + final float m1 = rightPositions[k]; + if (m1 < lowerLimit) { + final long k1 = rightBins[k] & COUNT_BITS; + float delta = (m1 - lowerPosition); + final long k0 = lowerBin & COUNT_BITS; + final long sum = k0 + k1; + final float w = (float) k0 / (float) sum; + lowerPosition = -delta * w + m1; + // set approximate flag + lowerBin = sum | APPROX_FLAG_BIT; + ++k; + } else { + break; + } + } + + // if there are values below the lower limit, store them in array position 0 + if ((lowerBin & COUNT_BITS) > 0) { + mergedPositions[0] = lowerPosition; + mergedBins[0] = lowerBin; + pos = 1; + } + + // if there are values below the lower limit, fill in array position 1 + // else array position 0 + while (j != leftBinCount || k != rightBinCount) { + if (j != leftBinCount && (k == rightBinCount || leftPositions[j] < rightPositions[k])) { + mergedPositions[pos] = leftPositions[j]; + mergedBins[pos] = leftBins[j]; + ++j; + break; + } else { + mergedPositions[pos] = rightPositions[k]; + mergedBins[pos] = rightBins[k]; + ++k; + break; + } + } + + while (j != leftBinCount || k != rightBinCount) { + if (j != leftBinCount && (k == rightBinCount || leftPositions[j] < rightPositions[k])) { + final float m1 = leftPositions[j]; + final long k1 = leftBins[j] & COUNT_BITS; + + // above the upper limit gets merged continuously in the left histogram + if (m1 > upperLimit) { + float delta = (m1 - upperPosition); + final long k0 = upperBin & COUNT_BITS; + final long sum = k0 + k1; + final float w = (float) k0 / (float) sum; + upperPosition = -delta * w + m1; + // set approximate flag + upperBin = sum | APPROX_FLAG_BIT; + ++j; + continue; + } + + final float delta = (m1 - mergedPositions[pos]); + + if (delta <= cutoff) { + final long k0 = mergedBins[pos] & COUNT_BITS; + final long sum = k0 + k1; + final float w = (float) k0 / (float) sum; + mergedPositions[pos] = -delta * w + m1; + // set approximate flag + mergedBins[pos] = sum | APPROX_FLAG_BIT; + } else { + ++pos; + mergedPositions[pos] = m1; + mergedBins[pos] = k1; + } + ++j; + } else { + final float m1 = rightPositions[k]; + final long k1 = rightBins[k] & COUNT_BITS; + + // above the upper limit gets merged continuously in the right histogram + if (m1 > upperLimit) { + float delta = (m1 - upperPosition); + final long k0 = upperBin & COUNT_BITS; + final long sum = k0 + k1; + final float w = (float) k0 / (float) sum; + upperPosition = -delta * w + m1; + // set approximate flag + upperBin = sum | APPROX_FLAG_BIT; + ++k; + continue; + } + + final float delta = (m1 - mergedPositions[pos]); + + if (delta <= cutoff) { + final long k0 = mergedBins[pos] & COUNT_BITS; + final long sum = k0 + k1; + final float w = (float) k0 / (float) sum; + mergedPositions[pos] = -delta * w + m1; + mergedBins[pos] = sum | APPROX_FLAG_BIT; + } else { + ++pos; + mergedPositions[pos] = m1; + mergedBins[pos] = k1; + } + ++k; + } + } + + if ((upperBin & COUNT_BITS) > 0) { + ++pos; + mergedPositions[pos] = upperPosition; + mergedBins[pos] = upperBin; + } + + return pos + 1; + } + + + /** + * mergeBins performs the given number of bin merge operations on the given histogram + *

+ * It repeatedly merges the two closest bins until it has performed the requested number of merge operations. + * Merges are done in-place and unused bins have unknown state + *

+ * next / prev maintains a doubly-linked list of valid bin indices into the mergedBins array. + *

+ * Fast operation is achieved by building a min-heap of the deltas as opposed to repeatedly + * scanning the array of deltas to find the minimum. A reverse index into the heap is maintained + * to allow deleting and updating of specific deltas. + *

+ * next and prev arrays are used to maintain indices to the previous / next valid bin from a given bin index + *

+ * Its effect is equivalent to running the following code: + *

+ *

+   *   ApproximateHistogram merged = new ApproximateHistogram(mergedBinCount, mergedPositions, mergedBins);
+   *
+   *   int targetSize = merged.binCount() - numMerge;
+   *   while(merged.binCount() > targetSize) {
+   *     merged.merge(merged.minDeltaIndex());
+   *   }
+   * 
+ * + * @param mergedBinCount + * @param mergedPositions + * @param mergedBins + * @param deltas + * @param numMerge + * @param next + * @param prev + * + * @return the last valid index into the mergedPositions and mergedBins arrays + */ + private static void mergeBins( + int mergedBinCount, float[] mergedPositions, + long[] mergedBins, + float[] deltas, + int numMerge, + int[] next, + int[] prev + ) + { + // repeatedly search for two closest bins, merge them and update the corresponding deltas + + // maintain index to the last valid bin + int lastValidIndex = mergedBinCount - 1; + + // initialize prev / next lookup arrays + for (int i = 0; i < mergedBinCount; ++i) { + next[i] = i + 1; + } + for (int i = 0; i < mergedBinCount; ++i) { + prev[i] = i - 1; + } + + // initialize min-heap of deltas and the reverse index into the heap + int heapSize = mergedBinCount - 1; + int[] heap = new int[heapSize]; + int[] reverseIndex = new int[heapSize]; + for (int i = 0; i < heapSize; ++i) { + heap[i] = i; + } + for (int i = 0; i < heapSize; ++i) { + reverseIndex[i] = i; + } + + heapify(heap, reverseIndex, heapSize, deltas); + + { + int i = 0; + while (i < numMerge) { + // find the smallest delta within the range used for bins + + // pick minimum delta by scanning array + //int currentIndex = minIndex(deltas, lastValidIndex); + + // pick minimum delta index using min-heap + int currentIndex = heap[0]; + + final int nextIndex = next[currentIndex]; + final int prevIndex = prev[currentIndex]; + + final long k0 = mergedBins[currentIndex] & COUNT_BITS; + final long k1 = mergedBins[nextIndex] & COUNT_BITS; + final float m0 = mergedPositions[currentIndex]; + final float m1 = mergedPositions[nextIndex]; + final float d1 = deltas[nextIndex]; + + final long sum = k0 + k1; + final float w = (float) k0 / (float) sum; + + // merge bin at given position with the next bin + final float mm0 = (m0 - m1) * w + m1; + + mergedPositions[currentIndex] = mm0; + //mergedPositions[nextIndex] = Float.MAX_VALUE; // for debugging + + mergedBins[currentIndex] = sum | APPROX_FLAG_BIT; + //mergedBins[nextIndex] = -1; // for debugging + + // update deltas and min-heap + if (nextIndex == lastValidIndex) { + // merged bin is the last => remove the current bin delta from the heap + heapSize = heapDelete(heap, reverseIndex, heapSize, reverseIndex[currentIndex], deltas); + + //deltas[currentIndex] = Float.MAX_VALUE; // for debugging + } else { + // merged bin is not the last => remove the merged bin delta from the heap + heapSize = heapDelete(heap, reverseIndex, heapSize, reverseIndex[nextIndex], deltas); + + // updated current delta + deltas[currentIndex] = m1 - mm0 + d1; + + // updated delta is necessarily larger than existing one, therefore we only need to push it down the heap + siftDown(heap, reverseIndex, reverseIndex[currentIndex], heapSize - 1, deltas); + } + + if (prevIndex >= 0) { + // current bin is not the first, therefore update the previous bin delta + deltas[prevIndex] = mm0 - mergedPositions[prevIndex]; + + // updated previous bin delta is necessarily larger than its existing value => push down the heap + siftDown(heap, reverseIndex, reverseIndex[prevIndex], heapSize - 1, deltas); + } + + // mark the merged bin as invalid + // deltas[nextIndex] = Float.MAX_VALUE; // for debugging + + // update last valid index if we merged the last bin + if (nextIndex == lastValidIndex) { + lastValidIndex = currentIndex; + } + + next[currentIndex] = next[nextIndex]; + if (nextIndex < lastValidIndex) { + prev[next[nextIndex]] = currentIndex; + } + + ++i; + } + } + } + + /** + * Builds a min-heap and a reverseIndex into the heap from the given array of values + * + * @param heap min-heap stored as indices into the array of values + * @param reverseIndex reverse index from the array of values into the heap + * @param count current size of the heap + * @param values values to be stored in the heap + */ + private static void heapify(int[] heap, int[] reverseIndex, int count, float[] values) + { + int start = (count - 2) / 2; + while (start >= 0) { + siftDown(heap, reverseIndex, start, count - 1, values); + start--; + } + } + + /** + * Rebalances the min-heap by pushing values from the top down and simultaneously updating the reverse index + * + * @param heap min-heap stored as indices into the array of values + * @param reverseIndex reverse index from the array of values into the heap + * @param start index to start re-balancing from + * @param end index to stop re-balancing at + * @param values values stored in the heap + */ + private static void siftDown(int[] heap, int[] reverseIndex, int start, int end, float[] values) + { + int root = start; + while (root * 2 + 1 <= end) { + int child = root * 2 + 1; + int swap = root; + if (values[heap[swap]] > values[heap[child]]) { + swap = child; + } + if (child + 1 <= end && values[heap[swap]] > values[heap[child + 1]]) { + swap = child + 1; + } + if (swap != root) { + // swap + int tmp = heap[swap]; + heap[swap] = heap[root]; + heap[root] = tmp; + + // heap index from delta index + reverseIndex[heap[swap]] = swap; + reverseIndex[heap[root]] = root; + + root = swap; + } else { + return; + } + } + } + + /** + * Deletes an item from the min-heap and updates the reverse index + * + * @param heap min-heap stored as indices into the array of values + * @param reverseIndex reverse index from the array of values into the heap + * @param count current size of the heap + * @param heapIndex index of the item to be deleted + * @param values values stored in the heap + * + * @return + */ + private static int heapDelete(int[] heap, int[] reverseIndex, int count, int heapIndex, float[] values) + { + int end = count - 1; + + reverseIndex[heap[heapIndex]] = -1; + + heap[heapIndex] = heap[end]; + reverseIndex[heap[heapIndex]] = heapIndex; + + end--; + siftDown(heap, reverseIndex, heapIndex, end, values); + return count - 1; + } + + private static int minIndex(float[] deltas, int lastValidIndex) + { + int minIndex = -1; + float min = Float.MAX_VALUE; + for (int k = 0; k < lastValidIndex; ++k) { + float value = deltas[k]; + if (value < min) { + minIndex = k; + min = value; + } + } + return minIndex; + } + + /** + * Combines two sets of histogram bins using merge-sort and computes the delta between consecutive bin positions. + * Duplicate bins are merged together. + * + * @param leftBinCount + * @param leftPositions + * @param leftBins + * @param rightBinCount + * @param rightPositions + * @param rightBins + * @param mergedPositions array to store the combined bin positions (size must be at least leftBinCount + rightBinCount) + * @param mergedBins array to store the combined bin counts (size must be at least leftBinCount + rightBinCount) + * @param deltas deltas between consecutive bin positions in the merged bins (size must be at least leftBinCount + rightBinCount) + * + * @return the number of combined bins + */ + private static int combineBins( + int leftBinCount, float[] leftPositions, long[] leftBins, + int rightBinCount, float[] rightPositions, long[] rightBins, + float[] mergedPositions, long[] mergedBins, float[] deltas + ) + { + int i = 0; + int j = 0; + int k = 0; + while (j < leftBinCount || k < rightBinCount) { + if (j < leftBinCount && (k == rightBinCount || leftPositions[j] < rightPositions[k])) { + mergedPositions[i] = leftPositions[j]; + mergedBins[i] = leftBins[j]; + ++j; + } else if (k < rightBinCount && (j == leftBinCount || leftPositions[j] > rightPositions[k])) { + mergedPositions[i] = rightPositions[k]; + mergedBins[i] = rightBins[k]; + ++k; + } else { + // combine overlapping bins + mergedPositions[i] = leftPositions[j]; + mergedBins[i] = leftBins[j] + rightBins[k]; + ++j; + ++k; + } + if (deltas != null && i > 0) { + deltas[i - 1] = mergedPositions[i] - mergedPositions[i - 1]; + } + ++i; + } + return i; + } + + /** + * Returns a byte-array representation of this ApproximateHistogram object + * + * @return + */ + @JsonValue + public byte[] toBytes() + { + ByteBuffer buf = ByteBuffer.allocate(getMinStorageSize()); + toBytes(buf); + return buf.array(); + } + + + public int getDenseStorageSize() + { + return Ints.BYTES * 2 + Floats.BYTES * size + Longs.BYTES * size + Floats.BYTES * 2; + } + + public int getSparseStorageSize() + { + return Ints.BYTES * 2 + Floats.BYTES * binCount + Longs.BYTES * binCount + Floats.BYTES * 2; + } + + public int getCompactStorageSize() + { + // ensures exactCount and (count - exactCount) can safely be cast to (int) + Preconditions.checkState(canStoreCompact(), "Approximate histogram cannot be stored in compact form"); + + final long exactCount = getExactCount(); + if (exactCount == count) { + return Shorts.BYTES + 1 + Floats.BYTES * (int) exactCount; + } else { + return Shorts.BYTES + + 1 + + Floats.BYTES * (int) exactCount + + 1 + + Floats.BYTES * (int) (count - exactCount) + + Floats.BYTES * 2; + } + } + + public int getMaxStorageSize() + { + return getDenseStorageSize(); + } + + /** + * Returns the minimum number of bytes required to store this ApproximateHistogram object + * + * @return required number of bytes + */ + public int getMinStorageSize() + { + // sparse is always small than dense, so no need to check + if (canStoreCompact() && getCompactStorageSize() < getSparseStorageSize()) { + return getCompactStorageSize(); + } else { + return getSparseStorageSize(); + } + } + + /** + * Checks whether this approximate histogram can be stored in a compact form + * + * @return true if yes, false otherwise + */ + public boolean canStoreCompact() + { + final long exactCount = getExactCount(); + return ( + size <= Short.MAX_VALUE + && exactCount <= Byte.MAX_VALUE + && (count - exactCount) <= Byte.MAX_VALUE + ); + } + + /** + * Writes the representation of this ApproximateHistogram object to the given byte-buffer + * + * @param buf + */ + public void toBytes(ByteBuffer buf) + { + if (canStoreCompact() && getCompactStorageSize() < getSparseStorageSize()) { + // store compact + toBytesCompact(buf); + } else { + // store sparse + toBytesSparse(buf); + } + } + + /** + * Writes the dense representation of this ApproximateHistogram object to the given byte-buffer + *

+ * Requires 16 + 12 * size bytes of storage + * + * @param buf + */ + public void toBytesDense(ByteBuffer buf) + { + buf.putInt(size); + buf.putInt(binCount); + + buf.asFloatBuffer().put(positions); + buf.position(buf.position() + Floats.BYTES * positions.length); + buf.asLongBuffer().put(bins); + buf.position(buf.position() + Longs.BYTES * bins.length); + + buf.putFloat(min); + buf.putFloat(max); + } + + /** + * Writes the sparse representation of this ApproximateHistogram object to the given byte-buffer + *

+ * Requires 16 + 12 * binCount bytes of storage + * + * @param buf ByteBuffer to write object to + */ + public void toBytesSparse(ByteBuffer buf) + { + buf.putInt(size); + buf.putInt(-1 * binCount); // use negative binCount to indicate sparse storage + for (int i = 0; i < binCount; ++i) { + buf.putFloat(positions[i]); + } + for (int i = 0; i < binCount; ++i) { + buf.putLong(bins[i]); + } + buf.putFloat(min); + buf.putFloat(max); + } + + /** + * Returns a compact byte-buffer representation of this ApproximateHistogram object + * storing actual values as opposed to histogram bins + *

+ * Requires 3 + 4 * count bytes of storage with count <= 127 + * + * @param buf + */ + public void toBytesCompact(ByteBuffer buf) + { + Preconditions.checkState(canStoreCompact(), "Approximate histogram cannot be stored in compact form"); + + buf.putShort((short) (-1 * size)); // use negative size to indicate compact storage + + final long exactCount = getExactCount(); + if (exactCount != count) { + // use negative count to indicate approximate bins + buf.put((byte) (-1 * (count - exactCount))); + + // store actual values instead of bins + for (int i = 0; i < binCount; ++i) { + // repeat each value bins[i] times for approximate bins + if ((bins[i] & APPROX_FLAG_BIT) != 0) { + for (int k = 0; k < (bins[i] & COUNT_BITS); ++k) { + buf.putFloat(positions[i]); + } + } + } + + // tack on min and max since they may be lost int the approximate bins + buf.putFloat(min); + buf.putFloat(max); + } + + buf.put((byte) exactCount); + // store actual values instead of bins + for (int i = 0; i < binCount; ++i) { + // repeat each value bins[i] times for exact bins + if ((bins[i] & APPROX_FLAG_BIT) == 0) { + for (int k = 0; k < (bins[i] & COUNT_BITS); ++k) { + buf.putFloat(positions[i]); + } + } + } + } + + /** + * Constructs an Approximate Histogram object from the given byte-array representation + * + * @param bytes + * + * @return + */ + public static ApproximateHistogram fromBytes(byte[] bytes) + { + ByteBuffer buf = ByteBuffer.wrap(bytes); + return fromBytes(buf); + } + + /** + * Constructs an ApproximateHistogram object from the given dense byte-buffer representation + * + * @param buf + * + * @return + */ + public static ApproximateHistogram fromBytesDense(ByteBuffer buf) + { + int size = buf.getInt(); + int binCount = buf.getInt(); + + float[] positions = new float[size]; + long[] bins = new long[size]; + + buf.asFloatBuffer().get(positions); + buf.position(buf.position() + Floats.BYTES * positions.length); + buf.asLongBuffer().get(bins); + buf.position(buf.position() + Longs.BYTES * bins.length); + + float min = buf.getFloat(); + float max = buf.getFloat(); + + return new ApproximateHistogram(binCount, positions, bins, min, max); + } + + /** + * Constructs an ApproximateHistogram object from the given dense byte-buffer representation + * + * @param buf + * + * @return + */ + public static ApproximateHistogram fromBytesSparse(ByteBuffer buf) + { + int size = buf.getInt(); + int binCount = -1 * buf.getInt(); + + float[] positions = new float[size]; + long[] bins = new long[size]; + + for (int i = 0; i < binCount; ++i) { + positions[i] = buf.getFloat(); + } + for (int i = 0; i < binCount; ++i) { + bins[i] = buf.getLong(); + } + + float min = buf.getFloat(); + float max = buf.getFloat(); + + return new ApproximateHistogram(binCount, positions, bins, min, max); + } + + /** + * Constructs an ApproximateHistogram object from the given compact byte-buffer representation + * + * @param buf + * + * @return + */ + public static ApproximateHistogram fromBytesCompact(ByteBuffer buf) + { + short size = (short) (-1 * buf.getShort()); + byte count = buf.get(); + + if (count >= 0) { + // only exact bins + ApproximateHistogram histogram = new ApproximateHistogram(size); + for (int i = 0; i < count; ++i) { + histogram.offer(buf.getFloat()); + } + return histogram; + } else { + byte approxCount = (byte) (-1 * count); + + Map approx = Maps.newHashMap(); + + for (int i = 0; i < approxCount; ++i) { + final float value = buf.getFloat(); + if (approx.containsKey(value)) { + approx.put(value, approx.get(value) + 1); + } else { + approx.put(value, 1L); + } + } + + float min = buf.getFloat(); + float max = buf.getFloat(); + + byte exactCount = buf.get(); + + Map exact = Maps.newHashMap(); + + for (int i = 0; i < exactCount; ++i) { + final float value = buf.getFloat(); + if (exact.containsKey(value)) { + exact.put(value, exact.get(value) + 1); + } else { + exact.put(value, 1L); + } + } + + int binCount = exact.size() + approx.size(); + + List pos = Lists.newArrayList(); + pos.addAll(exact.keySet()); + pos.addAll(approx.keySet()); + Collections.sort(pos); + + float[] positions = new float[size]; + long[] bins = new long[size]; + + for (int i = 0; i < pos.size(); ++i) { + positions[i] = pos.get(i); + } + + for (int i = 0; i < pos.size(); ++i) { + final float value = pos.get(i); + if (exact.containsKey(value)) { + bins[i] = exact.get(value); + } else { + bins[i] = approx.get(value) | APPROX_FLAG_BIT; + } + } + + return new ApproximateHistogram(binCount, positions, bins, min, max); + } + } + + /** + * Constructs an ApproximateHistogram object from the given byte-buffer representation + * + * @param buf + * + * @return + */ + public static ApproximateHistogram fromBytes(ByteBuffer buf) + { + ByteBuffer copy = buf.asReadOnlyBuffer(); + // negative size indicates compact representation + // this works regardless of whether we use int or short for the size since the leftmost bit is the sign bit + if (copy.getShort(buf.position()) < 0) { + return fromBytesCompact(buf); + } else { + // ignore size + copy.getInt(); + // determine if sparse or dense based on sign of binCount + if (copy.getInt() < 0) { + return fromBytesSparse(buf); + } else { + return fromBytesDense(buf); + } + } + } + + /** + * Returns the approximate number of items less than or equal to b in the histogram + * + * @param b + * + * @return the approximate number of items less than or equal to b + */ + public double sum(final float b) + { + if (b < min) { + return 0; + } + if (b >= max) { + return count; + } + + int index = Arrays.binarySearch(positions, 0, binCount, b); + boolean exactMatch = index >= 0; + index = exactMatch ? index : -(index + 1); + + // we want positions[index] <= b < positions[index+1] + if (!exactMatch) { + index--; + } + + final boolean outerLeft = index < 0; + final boolean outerRight = index >= (binCount - 1); + + final long m0 = outerLeft ? 0 : (bins[index] & COUNT_BITS); + final long m1 = outerRight ? 0 : (bins[index + 1] & COUNT_BITS); + final double p0 = outerLeft ? min : positions[index]; + final double p1 = outerRight ? max : positions[index + 1]; + final boolean exact0 = (!outerLeft && (bins[index] & APPROX_FLAG_BIT) == 0); + final boolean exact1 = (!outerRight && (bins[index + 1] & APPROX_FLAG_BIT) == 0); + + // handle case when p0 = p1, which happens if the first bin = min or the last bin = max + final double l = (p1 == p0) ? 0 : (b - p0) / (p1 - p0); + + // don't include exact counts in the trapezoid calculation + long tm0 = m0; + long tm1 = m1; + if (exact0) { + tm0 = 0; + } + if (exact1) { + tm1 = 0; + } + final double mb = tm0 + (tm1 - tm0) * l; + double s = 0.5 * (tm0 + mb) * l; + + for (int i = 0; i < index; ++i) { + s += (bins[i] & COUNT_BITS); + } + + // add full bin count if left bin count is exact + if (exact0) { + return (s + m0); + } + + // otherwise add only the left half of the bin + else { + return (s + 0.5 * m0); + } + } + + /** + * Returns the approximate quantiles corresponding to the given probabilities. + * probabilities = [.5f] returns [median] + * probabilities = [.25f, .5f, .75f] returns the quartiles, [25%ile, median, 75%ile] + * + * @param probabilities + * + * @return an array of length probabilities.length representing the the approximate sample quantiles + * corresponding to the given probabilities + */ + + public float[] getQuantiles(float[] probabilities) + { + for (float p : probabilities) { + Preconditions.checkArgument(0 < p & p < 1, "quantile probabilities must be strictly between 0 and 1"); + } + + float[] quantiles = new float[probabilities.length]; + Arrays.fill(quantiles, Float.NaN); + + if (this.count() == 0) { + return quantiles; + } + + final long[] bins = this.bins(); + + for (int j = 0; j < probabilities.length; ++j) { + final double s = probabilities[j] * this.count(); + + int i = 0; + int sum = 0; + int k = 1; + long count = 0; + while (k <= this.binCount()) { + count = bins[k - 1]; + if (sum + count > s) { + i = k - 1; + break; + } else { + sum += count; + } + ++k; + } + + if (i == 0) { + quantiles[j] = this.min(); + } else { + final double d = s - sum; + final double c = -2 * d; + final long a = bins[i] - bins[i - 1]; + final long b = 2 * bins[i - 1]; + double z = 0; + if (a == 0) { + z = -c / b; + } else { + z = (-b + Math.sqrt(b * b - 4 * a * c)) / (2 * a); + } + final double uj = this.positions[i - 1] + (this.positions[i] - this.positions[i - 1]) * z; + quantiles[j] = (float) uj; + } + } + + return quantiles; + } + + /** + * Computes a visual representation of the approximate histogram with bins laid out according to the given breaks + * + * @param breaks + * + * @return + */ + public Histogram toHistogram(final float[] breaks) + { + final double[] approximateBins = new double[breaks.length - 1]; + + double prev = sum(breaks[0]); + for (int i = 1; i < breaks.length; ++i) { + double s = sum(breaks[i]); + approximateBins[i - 1] = (float) (s - prev); + prev = s; + } + + return new Histogram(breaks, approximateBins); + } + + /** + * Computes a visual representation of the approximate histogram with a given number of equal-sized bins + * + * @param size number of equal-sized bins to divide the histogram into + * + * @return + */ + public Histogram toHistogram(int size) + { + Preconditions.checkArgument(size > 1, "histogram size must be greater than 1"); + + float[] breaks = new float[size + 1]; + float delta = (max - min) / (size - 1); + breaks[0] = min - delta; + for (int i = 1; i < breaks.length - 1; ++i) { + breaks[i] = breaks[i - 1] + delta; + } + breaks[breaks.length - 1] = max; + return toHistogram(breaks); + } + + /** + * Computes a visual representation given an initial breakpoint, offset, and a bucket size. + * + * @param bucketSize the size of each bucket + * @param offset the location of one breakpoint + * + * @return + */ + public Histogram toHistogram(final float bucketSize, final float offset) + { + final float minFloor = (float) Math.floor((min() - offset) / bucketSize) * bucketSize + offset; + final float lowerLimitFloor = (float) Math.floor((lowerLimit - offset) / bucketSize) * bucketSize + offset; + final float firstBreak = Math.max(minFloor, lowerLimitFloor); + + final float maxCeil = (float) Math.ceil((max() - offset) / bucketSize) * bucketSize + offset; + final float upperLimitCeil = (float) Math.ceil((upperLimit - offset) / bucketSize) * bucketSize + offset; + final float lastBreak = Math.min(maxCeil, upperLimitCeil); + + final float cutoff = 0.1f; + + final ArrayList breaks = new ArrayList(); + + // to deal with left inclusivity when the min is the same as a break + final float bottomBreak = minFloor - bucketSize; + if (bottomBreak != firstBreak && (sum(firstBreak) - sum(bottomBreak) > cutoff)) { + breaks.add(bottomBreak); + } + + float left = firstBreak; + boolean leftSet = false; + + //the + bucketSize / 10 is because floating point addition is always slightly incorrect and so we need to account for that + while (left + bucketSize <= lastBreak + (bucketSize / 10)) { + final float right = left + bucketSize; + + if (sum(right) - sum(left) > cutoff) { + if (!leftSet) { + breaks.add(left); + } + breaks.add(right); + leftSet = true; + } else { + leftSet = false; + } + + left = right; + } + + if (breaks.get(breaks.size() - 1) != maxCeil && (sum(maxCeil) - sum(breaks.get(breaks.size() - 1)) > cutoff)) { + breaks.add(maxCeil); + } + + return toHistogram(Floats.toArray(breaks)); + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java new file mode 100644 index 00000000000..7fcac6d7213 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java @@ -0,0 +1,103 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.google.common.primitives.Longs; +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; + +import java.util.Comparator; + +public class ApproximateHistogramAggregator implements Aggregator +{ + public static final Comparator COMPARATOR = new Comparator() + { + @Override + public int compare(Object o, Object o1) + { + return Longs.compare(((ApproximateHistogram) o).count(), ((ApproximateHistogram) o1).count()); + } + }; + + static Object combineHistograms(Object lhs, Object rhs) + { + return ((ApproximateHistogram) lhs).foldFast((ApproximateHistogram) rhs); + } + + private final String name; + private final FloatColumnSelector selector; + private final int resolution; + private final float lowerLimit; + private final float upperLimit; + + private ApproximateHistogram histogram; + + public ApproximateHistogramAggregator( + String name, + FloatColumnSelector selector, + int resolution, + float lowerLimit, + float upperLimit + ) + { + this.name = name; + this.selector = selector; + this.resolution = resolution; + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + this.histogram = new ApproximateHistogram(resolution, lowerLimit, upperLimit); + } + + @Override + public void aggregate() + { + histogram.offer(selector.get()); + } + + @Override + public void reset() + { + this.histogram = new ApproximateHistogram(resolution, lowerLimit, upperLimit); + } + + @Override + public Object get() + { + return histogram; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()"); + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java new file mode 100644 index 00000000000..603d07765ee --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java @@ -0,0 +1,253 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.ColumnSelectorFactory; +import org.apache.commons.codec.binary.Base64; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +@JsonTypeName("approxHistogram") +public class ApproximateHistogramAggregatorFactory implements AggregatorFactory +{ + private static final byte CACHE_TYPE_ID = 0x8; + + protected final String name; + protected final String fieldName; + + protected final int resolution; + protected final int numBuckets; + + protected final float lowerLimit; + protected final float upperLimit; + + @JsonCreator + public ApproximateHistogramAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("resolution") Integer resolution, + @JsonProperty("numBuckets") Integer numBuckets, + @JsonProperty("lowerLimit") Float lowerLimit, + @JsonProperty("upperLimit") Float upperLimit + + ) + { + this.name = name; + this.fieldName = fieldName.toLowerCase(); + this.resolution = resolution == null ? ApproximateHistogram.DEFAULT_HISTOGRAM_SIZE : resolution; + this.numBuckets = numBuckets == null ? ApproximateHistogram.DEFAULT_BUCKET_SIZE : numBuckets; + this.lowerLimit = lowerLimit == null ? Float.NEGATIVE_INFINITY : lowerLimit; + this.upperLimit = upperLimit == null ? Float.POSITIVE_INFINITY : upperLimit; + + Preconditions.checkArgument(this.resolution > 0, "resolution must be greater than 1"); + Preconditions.checkArgument(this.numBuckets > 0, "numBuckets must be greater than 1"); + Preconditions.checkArgument(this.upperLimit > this.lowerLimit, "upperLimit must be greater than lowerLimit"); + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new ApproximateHistogramAggregator( + name, + metricFactory.makeFloatColumnSelector(fieldName), + resolution, + lowerLimit, + upperLimit + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new ApproximateHistogramBufferAggregator( + metricFactory.makeFloatColumnSelector(fieldName), + resolution, + lowerLimit, + upperLimit + ); + } + + @Override + public Comparator getComparator() + { + return ApproximateHistogramAggregator.COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return ApproximateHistogramAggregator.combineHistograms(lhs, rhs); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new ApproximateHistogramAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit); + } + + @Override + public List getRequiredColumns() + { + return Arrays.asList( + new ApproximateHistogramAggregatorFactory( + fieldName, + fieldName, + resolution, + numBuckets, + lowerLimit, + upperLimit + ) + ); + } + + @Override + public Object deserialize(Object object) + { + if (object instanceof byte[]) { + final ApproximateHistogram ah = ApproximateHistogram.fromBytes((byte[]) object); + ah.setLowerLimit(lowerLimit); + ah.setUpperLimit(upperLimit); + + return ah; + } else if (object instanceof ByteBuffer) { + final ApproximateHistogram ah = ApproximateHistogram.fromBytes((ByteBuffer) object); + ah.setLowerLimit(lowerLimit); + ah.setUpperLimit(upperLimit); + + return ah; + } else if (object instanceof String) { + byte[] bytes = Base64.decodeBase64(((String) object).getBytes(Charsets.UTF_8)); + final ApproximateHistogram ah = ApproximateHistogram.fromBytes(bytes); + ah.setLowerLimit(lowerLimit); + ah.setUpperLimit(upperLimit); + + return ah; + } else { + return object; + } + } + + @Override + public Object finalizeComputation(Object object) + { + return ((ApproximateHistogram) object).toHistogram(numBuckets); + } + + @JsonProperty + @Override + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @JsonProperty + public int getResolution() + { + return resolution; + } + + @JsonProperty + public float getLowerLimit() + { + return lowerLimit; + } + + @JsonProperty + public float getUpperLimit() + { + return upperLimit; + } + + @JsonProperty + public int getNumBuckets() + { + return numBuckets; + } + + @Override + public List requiredFields() + { + return Arrays.asList(fieldName); + } + + @Override + public byte[] getCacheKey() + { + byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8); + return ByteBuffer.allocate(1 + fieldNameBytes.length + Ints.BYTES * 2 + Floats.BYTES * 2) + .put(CACHE_TYPE_ID) + .put(fieldNameBytes) + .putInt(resolution) + .putInt(numBuckets) + .putFloat(lowerLimit) + .putFloat(upperLimit).array(); + } + + @Override + public String getTypeName() + { + return "approximateHistogram"; + } + + @Override + public int getMaxIntermediateSize() + { + return new ApproximateHistogram(resolution).getMaxStorageSize(); + } + + @Override + public Object getAggregatorStartValue() + { + return new ApproximateHistogram(resolution); + } + + @Override + public String toString() + { + return "ApproximateHistogramAggregatorFactory{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", resolution=" + resolution + + ", numBuckets=" + numBuckets + + ", lowerLimit=" + lowerLimit + + ", upperLimit=" + upperLimit + + '}'; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java new file mode 100644 index 00000000000..dbd566f2693 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java @@ -0,0 +1,95 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.FloatColumnSelector; + +import java.nio.ByteBuffer; + +public class ApproximateHistogramBufferAggregator implements BufferAggregator +{ + private final FloatColumnSelector selector; + private final int resolution; + private final float lowerLimit; + private final float upperLimit; + + public ApproximateHistogramBufferAggregator(FloatColumnSelector selector, int resolution, float lowerLimit, float upperLimit) + { + this.selector = selector; + this.resolution = resolution; + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + } + + @Override + public void init(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + mutationBuffer.putInt(resolution); + mutationBuffer.putInt(0); //initial binCount + for (int i = 0; i < resolution; ++i) { + mutationBuffer.putFloat(0f); + } + for (int i = 0; i < resolution; ++i) { + mutationBuffer.putLong(0L); + } + + // min + mutationBuffer.putFloat(Float.POSITIVE_INFINITY); + // max + mutationBuffer.putFloat(Float.NEGATIVE_INFINITY); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer); + h0.offer(selector.get()); + + mutationBuffer.position(position); + h0.toBytesDense(mutationBuffer); + } + + @Override + public Object get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + return ApproximateHistogram.fromBytes(mutationBuffer); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getFloat()"); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramDruidModule.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramDruidModule.java new file mode 100644 index 00000000000..592ca2c8d4b --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramDruidModule.java @@ -0,0 +1,60 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; +import io.druid.segment.serde.ComplexMetrics; + +import java.util.List; + +/** + */ +public class ApproximateHistogramDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule().registerSubtypes( + ApproximateHistogramFoldingAggregatorFactory.class, + ApproximateHistogramAggregatorFactory.class, + EqualBucketsPostAggregator.class, + CustomBucketsPostAggregator.class, + BucketsPostAggregator.class, + QuantilesPostAggregator.class, + QuantilePostAggregator.class, + MinPostAggregator.class, + MaxPostAggregator.class + ) + ); + } + + @Override + public void configure(Binder binder) + { + if (ComplexMetrics.getSerdeForType("approximateHistogram") == null) { + ComplexMetrics.registerSerde("approximateHistogram", new ApproximateHistogramFoldingSerde()); + } + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregator.java new file mode 100644 index 00000000000..51dc682de0a --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregator.java @@ -0,0 +1,101 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + + +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.ObjectColumnSelector; + +public class ApproximateHistogramFoldingAggregator implements Aggregator +{ + private final String name; + private final ObjectColumnSelector selector; + private final int resolution; + private final float lowerLimit; + private final float upperLimit; + + private ApproximateHistogram histogram; + private float[] tmpBufferP; + private long[] tmpBufferB; + + public ApproximateHistogramFoldingAggregator( + String name, + ObjectColumnSelector selector, + int resolution, + float lowerLimit, + float upperLimit + ) + { + this.name = name; + this.selector = selector; + this.resolution = resolution; + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + this.histogram = new ApproximateHistogram(resolution, lowerLimit, upperLimit); + + tmpBufferP = new float[resolution]; + tmpBufferB = new long[resolution]; + } + + @Override + public void aggregate() + { + ApproximateHistogram h = selector.get(); + if (h == null) { + return; + } + + if (h.binCount() + histogram.binCount() <= tmpBufferB.length) { + histogram.foldFast(h, tmpBufferP, tmpBufferB); + } else { + histogram.foldFast(h); + } + } + + @Override + public void reset() + { + this.histogram = new ApproximateHistogram(resolution, lowerLimit, upperLimit); + } + + @Override + public Object get() + { + return histogram; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()"); + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java new file mode 100644 index 00000000000..c5bb1e552c5 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java @@ -0,0 +1,164 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Charsets; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import com.metamx.common.IAE; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ObjectColumnSelector; + +import java.nio.ByteBuffer; + +@JsonTypeName("approxHistogramFold") +public class ApproximateHistogramFoldingAggregatorFactory extends ApproximateHistogramAggregatorFactory +{ + private static final byte CACHE_TYPE_ID = 0x9; + + @JsonCreator + public ApproximateHistogramFoldingAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("resolution") Integer resolution, + @JsonProperty("numBuckets") Integer numBuckets, + @JsonProperty("lowerLimit") Float lowerLimit, + @JsonProperty("upperLimit") Float upperLimit + ) + { + super(name, fieldName, resolution, numBuckets, lowerLimit, upperLimit); + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + + if (selector == null) { + // gracefully handle undefined metrics + + selector = new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return ApproximateHistogram.class; + } + + @Override + public ApproximateHistogram get() + { + return new ApproximateHistogram(0); + } + }; + } + + if (ApproximateHistogram.class.isAssignableFrom(selector.classOfObject())) { + return new ApproximateHistogramFoldingAggregator( + name, + selector, + resolution, + lowerLimit, + upperLimit + ); + } + + throw new IAE( + "Incompatible type for metric[%s], expected a ApproximateHistogram, got a %s", + fieldName, + selector.classOfObject() + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + + if (selector == null) { + // gracefully handle undefined metrics + + selector = new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return ApproximateHistogram.class; + } + + @Override + public ApproximateHistogram get() + { + return new ApproximateHistogram(0); + } + }; + } + + if (ApproximateHistogram.class.isAssignableFrom(selector.classOfObject())) { + return new ApproximateHistogramFoldingBufferAggregator(selector, resolution, lowerLimit, upperLimit); + } + + throw new IAE( + "Incompatible type for metric[%s], expected a ApproximateHistogram, got a %s", + fieldName, + selector.classOfObject() + ); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit); + } + + @Override + public byte[] getCacheKey() + { + byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8); + return ByteBuffer.allocate(1 + fieldNameBytes.length + Ints.BYTES * 2 + Floats.BYTES * 2) + .put(CACHE_TYPE_ID) + .put(fieldNameBytes) + .putInt(resolution) + .putInt(numBuckets) + .putFloat(lowerLimit) + .putFloat(upperLimit) + .array(); + } + + @Override + public String toString() + { + return "ApproximateHistogramFoldingAggregatorFactory{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", resolution=" + resolution + + ", numBuckets=" + numBuckets + + ", lowerLimit=" + lowerLimit + + ", upperLimit=" + upperLimit + + '}'; + } +} + diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java new file mode 100644 index 00000000000..4190ae50a40 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java @@ -0,0 +1,99 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.ObjectColumnSelector; + +import java.nio.ByteBuffer; + +public class ApproximateHistogramFoldingBufferAggregator implements BufferAggregator +{ + private final ObjectColumnSelector selector; + private final int resolution; + private final float upperLimit; + private final float lowerLimit; + + private float[] tmpBufferP; + private long[] tmpBufferB; + + public ApproximateHistogramFoldingBufferAggregator( + ObjectColumnSelector selector, + int resolution, + float lowerLimit, + float upperLimit + ) + { + this.selector = selector; + this.resolution = resolution; + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + + tmpBufferP = new float[resolution]; + tmpBufferB = new long[resolution]; + } + + @Override + public void init(ByteBuffer buf, int position) + { + ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit); + + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + // use dense storage for aggregation + h.toBytesDense(mutationBuffer); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer); + h0.setLowerLimit(lowerLimit); + h0.setUpperLimit(upperLimit); + ApproximateHistogram hNext = selector.get(); + h0.foldFast(hNext, tmpBufferP, tmpBufferB); + + mutationBuffer.position(position); + h0.toBytesDense(mutationBuffer); + } + + @Override + public Object get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.asReadOnlyBuffer(); + mutationBuffer.position(position); + return ApproximateHistogram.fromBytesDense(mutationBuffer); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getFloat()"); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingSerde.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingSerde.java new file mode 100644 index 00000000000..d7b988792bc --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingSerde.java @@ -0,0 +1,134 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.google.common.collect.Ordering; +import io.druid.data.input.InputRow; +import io.druid.segment.column.ColumnBuilder; +import io.druid.segment.data.GenericIndexed; +import io.druid.segment.data.ObjectStrategy; +import io.druid.segment.serde.ColumnPartSerde; +import io.druid.segment.serde.ComplexColumnPartSerde; +import io.druid.segment.serde.ComplexColumnPartSupplier; +import io.druid.segment.serde.ComplexMetricExtractor; +import io.druid.segment.serde.ComplexMetricSerde; + +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; + +public class ApproximateHistogramFoldingSerde extends ComplexMetricSerde +{ + private static Ordering comparator = new Ordering() + { + @Override + public int compare( + ApproximateHistogram arg1, ApproximateHistogram arg2 + ) + { + return ApproximateHistogramAggregator.COMPARATOR.compare(arg1, arg2); + } + }.nullsFirst(); + + @Override + public String getTypeName() + { + return "approximateHistogram"; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class extractedClass() + { + return ApproximateHistogram.class; + } + + @Override + public ApproximateHistogram extractValue(InputRow inputRow, String metricName) + { + List dimValues = inputRow.getDimension(metricName); + if (dimValues != null && dimValues.size() > 0) { + Iterator values = dimValues.iterator(); + + ApproximateHistogram h = new ApproximateHistogram(); + + while (values.hasNext()) { + float value = Float.parseFloat(values.next()); + h.offer(value); + } + return h; + } else { + return new ApproximateHistogram(0); + } + } + }; + } + + @Override + public ColumnPartSerde deserializeColumn( + ByteBuffer byteBuffer, ColumnBuilder columnBuilder + ) + { + final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy()); + + columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column)); + + return new ComplexColumnPartSerde(column, getTypeName()); + } + + public ObjectStrategy getObjectStrategy() + { + return new ObjectStrategy() + { + @Override + public Class getClazz() + { + return ApproximateHistogram.class; + } + + @Override + public ApproximateHistogram fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); + return ApproximateHistogram.fromBytes(readOnlyBuffer); + } + + @Override + public byte[] toBytes(ApproximateHistogram h) + { + if (h == null) { + return new byte[]{}; + } + return h.toBytes(); + } + + @Override + public int compare(ApproximateHistogram o1, ApproximateHistogram o2) + { + return comparator.compare(o1, o2); + } + }; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramPostAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramPostAggregator.java new file mode 100644 index 00000000000..92dc4a4ccb3 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramPostAggregator.java @@ -0,0 +1,68 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.query.aggregation.PostAggregator; + +import java.util.Comparator; +import java.util.Map; + +public abstract class ApproximateHistogramPostAggregator implements PostAggregator +{ + private static final Comparator COMPARATOR = ApproximateHistogramAggregator.COMPARATOR; + + private final String name; + private final String fieldName; + + public ApproximateHistogramPostAggregator( + String name, + String fieldName + ) + { + this.name = name; + this.fieldName = fieldName; + } + + @Override + public Comparator getComparator() + { + return COMPARATOR; + } + + @Override + public abstract Object compute(Map values); + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public abstract String toString(); +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ArrayUtils.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ArrayUtils.java new file mode 100644 index 00000000000..9aba505ea23 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ArrayUtils.java @@ -0,0 +1,58 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +public class ArrayUtils +{ + public static int hashCode(long[] a, int fromIndex, int toIndex) + { + int hashCode = 1; + int i = fromIndex; + while (i < toIndex) { + long v = a[i]; + hashCode = 31 * hashCode + (int) (v ^ (v >>> 32)); + ++i; + } + return hashCode; + } + + public static int hashCode(float[] a, int fromIndex, int toIndex) + { + int hashCode = 1; + int i = fromIndex; + while (i < toIndex) { + hashCode = 31 * hashCode + Float.floatToIntBits(a[i]); + ++i; + } + return hashCode; + } + + public static int hashCode(double[] a, int fromIndex, int toIndex) + { + int hashCode = 1; + int i = fromIndex; + while (i < toIndex) { + long v = Double.doubleToLongBits(a[i]); + hashCode = 31 * hashCode + (int) (v ^ (v >>> 32)); + ++i; + } + return hashCode; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/BucketsPostAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/BucketsPostAggregator.java new file mode 100644 index 00000000000..f9e6aa9be62 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/BucketsPostAggregator.java @@ -0,0 +1,91 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Sets; +import com.metamx.common.IAE; + +import java.util.Map; +import java.util.Set; + +@JsonTypeName("buckets") +public class BucketsPostAggregator extends ApproximateHistogramPostAggregator +{ + private final float bucketSize; + private final float offset; + + private String fieldName; + + @JsonCreator + public BucketsPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("bucketSize") float bucketSize, + @JsonProperty("offset") float offset + ) + { + super(name, fieldName); + this.bucketSize = bucketSize; + if (this.bucketSize <= 0) { + throw new IAE("Illegal bucketSize [%s], must be > 0", this.bucketSize); + } + this.offset = offset; + this.fieldName = fieldName; + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Object compute(Map values) + { + ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName()); + return ah.toHistogram(bucketSize, offset); + } + + @JsonProperty + public float getBucketSize() + { + return bucketSize; + } + + @JsonProperty + public float getOffset() + { + return bucketSize; + } + + @Override + public String toString() + { + return "BucketsPostAggregator{" + + "name='" + this.getName() + '\'' + + ", fieldName='" + this.getFieldName() + '\'' + + ", bucketSize=" + this.getBucketSize() + + ", offset=" + this.getOffset() + + '}'; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/BufferUtils.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/BufferUtils.java new file mode 100644 index 00000000000..823e6c59e36 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/BufferUtils.java @@ -0,0 +1,68 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import java.nio.DoubleBuffer; +import java.nio.FloatBuffer; + +public class BufferUtils +{ + public static int binarySearch(DoubleBuffer buf, int minIndex, int maxIndex, double value) + { + while (minIndex < maxIndex) { + int currIndex = (minIndex + maxIndex - 1) >>> 1; + + double currValue = buf.get(currIndex); + int comparison = Double.compare(currValue, value); + if (comparison == 0) { + return currIndex; + } + + if (comparison < 0) { + minIndex = currIndex + 1; + } else { + maxIndex = currIndex; + } + } + + return -(minIndex + 1); + } + + public static int binarySearch(FloatBuffer buf, int minIndex, int maxIndex, float value) + { + while (minIndex < maxIndex) { + int currIndex = (minIndex + maxIndex - 1) >>> 1; + + float currValue = buf.get(currIndex); + int comparison = Float.compare(currValue, value); + if (comparison == 0) { + return currIndex; + } + + if (comparison < 0) { + minIndex = currIndex + 1; + } else { + maxIndex = currIndex; + } + } + + return -(minIndex + 1); + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/CustomBucketsPostAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/CustomBucketsPostAggregator.java new file mode 100644 index 00000000000..8a34eaa2bdc --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/CustomBucketsPostAggregator.java @@ -0,0 +1,77 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Sets; + +import java.util.Arrays; +import java.util.Map; +import java.util.Set; + +@JsonTypeName("customBuckets") +public class CustomBucketsPostAggregator extends ApproximateHistogramPostAggregator +{ + private final float[] breaks; + private String fieldName; + + @JsonCreator + public CustomBucketsPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("breaks") float[] breaks + ) + { + super(name, fieldName); + this.breaks = breaks; + this.fieldName = fieldName; + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Object compute(Map values) + { + ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName()); + return ah.toHistogram(breaks); + } + + @JsonProperty + public float[] getBreaks() + { + return breaks; + } + + @Override + public String toString() + { + return "CustomBucketsPostAggregator{" + + "name='" + this.getName() + '\'' + + ", fieldName='" + this.getFieldName() + '\'' + + ", breaks=" + Arrays.toString(this.getBreaks()) + + '}'; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/EqualBucketsPostAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/EqualBucketsPostAggregator.java new file mode 100644 index 00000000000..ecdb791ea26 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/EqualBucketsPostAggregator.java @@ -0,0 +1,80 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Sets; +import com.metamx.common.IAE; + +import java.util.Map; +import java.util.Set; + +@JsonTypeName("equalBuckets") +public class EqualBucketsPostAggregator extends ApproximateHistogramPostAggregator +{ + private final int numBuckets; + private String fieldName; + + @JsonCreator + public EqualBucketsPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("numBuckets") int numBuckets + ) + { + super(name, fieldName); + this.numBuckets = numBuckets; + if (this.numBuckets <= 1) { + throw new IAE("Illegal number of buckets[%s], must be > 1", this.numBuckets); + } + this.fieldName = fieldName; + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Object compute(Map values) + { + ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName()); + return ah.toHistogram(numBuckets); + } + + @JsonProperty + public int getNumBuckets() + { + return numBuckets; + } + + @Override + public String toString() + { + return "EqualBucketsPostAggregator{" + + "name='" + this.getName() + '\'' + + ", fieldName='" + this.getFieldName() + '\'' + + ", numBuckets=" + this.getNumBuckets() + + '}'; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/Histogram.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/Histogram.java new file mode 100644 index 00000000000..384e6eb1fbf --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/Histogram.java @@ -0,0 +1,88 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Arrays; + +public class Histogram +{ + double[] breaks; + double[] counts; + + public Histogram(float[] breaks, double[] counts) + { + double[] retVal = new double[breaks.length]; + for (int i = 0; i < breaks.length; ++i) { + retVal[i] = (double) breaks[i]; + } + + this.breaks = retVal; + this.counts = counts; + } + + @JsonProperty + public double[] getBreaks() + { + return breaks; + } + + @JsonProperty + public double[] getCounts() + { + return counts; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Histogram that = (Histogram) o; + + if (!Arrays.equals(this.getBreaks(), that.getBreaks())) { + return false; + } + if (!Arrays.equals(this.getCounts(), that.getCounts())) { + return false; + } + return true; + } + + @Override + public int hashCode() + { + int result = (this.getBreaks() != null ? ArrayUtils.hashCode(this.getBreaks(), 0, this.getBreaks().length) : 0); + result = 31 * result + (this.getCounts() != null ? ArrayUtils.hashCode( + this.getCounts(), + 0, + this.getCounts().length + ) : 0); + return result; + } + + +} \ No newline at end of file diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/MaxPostAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/MaxPostAggregator.java new file mode 100644 index 00000000000..b0a0ead93d6 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/MaxPostAggregator.java @@ -0,0 +1,81 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Sets; + +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +@JsonTypeName("max") +public class MaxPostAggregator extends ApproximateHistogramPostAggregator +{ + static final Comparator COMPARATOR = new Comparator() + { + @Override + public int compare(Object o, Object o1) + { + return Double.compare(((Number) o).doubleValue(), ((Number) o1).doubleValue()); + } + }; + + private String fieldName; + + @JsonCreator + public MaxPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ) + { + super(name, fieldName); + this.fieldName = fieldName; + } + + @Override + public Comparator getComparator() + { + return COMPARATOR; + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Object compute(Map values) + { + final ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName()); + return ah.getMax(); + } + + @Override + public String toString() + { + return "QuantilePostAggregator{" + + "fieldName='" + fieldName + '\'' + + '}'; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/MinPostAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/MinPostAggregator.java new file mode 100644 index 00000000000..d986901b89f --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/MinPostAggregator.java @@ -0,0 +1,81 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Sets; + +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +@JsonTypeName("min") +public class MinPostAggregator extends ApproximateHistogramPostAggregator +{ + static final Comparator COMPARATOR = new Comparator() + { + @Override + public int compare(Object o, Object o1) + { + return Double.compare(((Number) o).doubleValue(), ((Number) o1).doubleValue()); + } + }; + + private String fieldName; + + @JsonCreator + public MinPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ) + { + super(name, fieldName); + this.fieldName = fieldName; + } + + @Override + public Comparator getComparator() + { + return COMPARATOR; + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Object compute(Map values) + { + final ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName()); + return ah.getMin(); + } + + @Override + public String toString() + { + return "QuantilePostAggregator{" + + "fieldName='" + fieldName + '\'' + + '}'; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilePostAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilePostAggregator.java new file mode 100644 index 00000000000..7a06a906c89 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilePostAggregator.java @@ -0,0 +1,96 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Sets; +import com.metamx.common.IAE; + +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +@JsonTypeName("quantile") +public class QuantilePostAggregator extends ApproximateHistogramPostAggregator +{ + static final Comparator COMPARATOR = new Comparator() + { + @Override + public int compare(Object o, Object o1) + { + return Double.compare(((Number) o).doubleValue(), ((Number) o1).doubleValue()); + } + }; + + private final float probability; + private String fieldName; + + @JsonCreator + public QuantilePostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("probability") float probability + ) + { + super(name, fieldName); + this.probability = probability; + this.fieldName = fieldName; + + if (probability < 0 | probability > 1) { + throw new IAE("Illegal probability[%s], must be strictly between 0 and 1", probability); + } + } + + @Override + public Comparator getComparator() + { + return COMPARATOR; + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Object compute(Map values) + { + final ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName()); + return ah.getQuantiles(new float[]{this.getProbability()})[0]; + } + + @JsonProperty + public float getProbability() + { + return probability; + } + + @Override + public String toString() + { + return "QuantilePostAggregator{" + + "probability=" + probability + + ", fieldName='" + fieldName + '\'' + + '}'; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/Quantiles.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/Quantiles.java new file mode 100644 index 00000000000..c9471bc3e0b --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/Quantiles.java @@ -0,0 +1,111 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Arrays; + +@JsonTypeName("quantiles") +public class Quantiles +{ + float[] probabilities; + float[] quantiles; + float min; + float max; + + @JsonCreator + public Quantiles( + @JsonProperty("probabilities") float[] probabilities, + @JsonProperty("quantiles") float[] quantiles, + @JsonProperty("min") float min, + @JsonProperty("max") float max + ) + { + this.probabilities = probabilities; + this.quantiles = quantiles; + this.min = min; + this.max = max; + } + + @JsonProperty + public float[] getProbabilities() + { + return probabilities; + } + + @JsonProperty + public float[] getQuantiles() + { + return quantiles; + } + + @JsonProperty + public float getMin() + { + return min; + } + + @JsonProperty + public float getMax() + { + return max; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Quantiles quantiles1 = (Quantiles) o; + + if (Float.compare(quantiles1.max, max) != 0) { + return false; + } + if (Float.compare(quantiles1.min, min) != 0) { + return false; + } + if (!Arrays.equals(probabilities, quantiles1.probabilities)) { + return false; + } + if (!Arrays.equals(quantiles, quantiles1.quantiles)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = probabilities != null ? Arrays.hashCode(probabilities) : 0; + result = 31 * result + (quantiles != null ? Arrays.hashCode(quantiles) : 0); + result = 31 * result + (min != +0.0f ? Float.floatToIntBits(min) : 0); + result = 31 * result + (max != +0.0f ? Float.floatToIntBits(max) : 0); + return result; + } +} diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilesPostAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilesPostAggregator.java new file mode 100644 index 00000000000..181861e5f80 --- /dev/null +++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilesPostAggregator.java @@ -0,0 +1,92 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Sets; +import com.metamx.common.IAE; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +@JsonTypeName("quantiles") +public class QuantilesPostAggregator extends ApproximateHistogramPostAggregator +{ + private final float[] probabilities; + private String fieldName; + + @JsonCreator + public QuantilesPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("probabilities") float[] probabilities + ) + { + super(name, fieldName); + this.probabilities = probabilities; + this.fieldName = fieldName; + + for (float p : probabilities) { + if (p < 0 | p > 1) { + throw new IAE("Illegal probability[%s], must be strictly between 0 and 1", p); + } + } + } + + @Override + public Comparator getComparator() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Object compute(Map values) + { + final ApproximateHistogram ah = (ApproximateHistogram) values.get(this.getFieldName()); + + return new Quantiles(this.getProbabilities(), ah.getQuantiles(this.getProbabilities()), ah.getMin(), ah.getMax()); + } + + @JsonProperty + public float[] getProbabilities() + { + return probabilities; + } + + @Override + public String toString() + { + return "EqualBucketsPostAggregator{" + + "name='" + this.getName() + '\'' + + ", fieldName='" + this.getFieldName() + '\'' + + ", probabilities=" + Arrays.toString(this.getProbabilities()) + + '}'; + } +} diff --git a/histogram/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/histogram/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..d39951f0cd8 --- /dev/null +++ b/histogram/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.query.aggregation.histogram.ApproximateHistogramDruidModule \ No newline at end of file diff --git a/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorTest.java b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorTest.java new file mode 100644 index 00000000000..353f9e73ad7 --- /dev/null +++ b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorTest.java @@ -0,0 +1,76 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.TestFloatColumnSelector; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class ApproximateHistogramAggregatorTest +{ + private void aggregateBuffer(TestFloatColumnSelector selector, BufferAggregator agg, ByteBuffer buf, int position) + { + agg.aggregate(buf, position); + selector.increment(); + } + + @Test + public void testBufferAggregate() throws Exception + { + final float[] values = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45}; + final int resolution = 5; + final int numBuckets = 5; + + final TestFloatColumnSelector selector = new TestFloatColumnSelector(values); + + ApproximateHistogramAggregatorFactory factory = new ApproximateHistogramAggregatorFactory( + "billy", "billy", resolution, numBuckets, Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY + ); + ApproximateHistogramBufferAggregator agg = new ApproximateHistogramBufferAggregator(selector, resolution, Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY); + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + int position = 0; + + agg.init(buf, position); + for (int i = 0; i < values.length; i++) { + aggregateBuffer(selector, agg, buf, position); + } + + ApproximateHistogram h = ((ApproximateHistogram) agg.get(buf, position)); + + Assert.assertArrayEquals( + "final bin positions don't match expected positions", + new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions, 0.01f + ); + + Assert.assertArrayEquals( + "final bin counts don't match expected counts", + new long[]{1, 2, 3, 3, 1}, h.bins() + ); + + Assert.assertEquals("getMin value doesn't match expected getMin", 2, h.min(), 0); + Assert.assertEquals("getMax value doesn't match expected getMax", 45, h.max(), 0); + + Assert.assertEquals("bin count doesn't match expected bin count", 5, h.binCount()); + } +} diff --git a/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramErrorBenchmark.java b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramErrorBenchmark.java new file mode 100644 index 00000000000..33621dc2144 --- /dev/null +++ b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramErrorBenchmark.java @@ -0,0 +1,191 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.google.common.primitives.Floats; +import io.druid.query.aggregation.Histogram; + +import java.util.Arrays; +import java.util.Random; + +public class ApproximateHistogramErrorBenchmark +{ + private boolean debug = true; + private int numBuckets = 20; + private int numBreaks = numBuckets + 1; + private int numPerHist = 50; + private int numHists = 10; + private int resolution = 50; + private int combinedResolution = 100; + private Random rand = new Random(2); + + public ApproximateHistogramErrorBenchmark setDebug(boolean debug) + { + this.debug = debug; + return this; + } + + public ApproximateHistogramErrorBenchmark setNumBuckets(int numBuckets) + { + this.numBuckets = numBuckets; + return this; + } + + public ApproximateHistogramErrorBenchmark setNumBreaks(int numBreaks) + { + this.numBreaks = numBreaks; + return this; + } + + public ApproximateHistogramErrorBenchmark setNumPerHist(int numPerHist) + { + this.numPerHist = numPerHist; + return this; + } + + public ApproximateHistogramErrorBenchmark setNumHists(int numHists) + { + this.numHists = numHists; + return this; + } + + public ApproximateHistogramErrorBenchmark setResolution(int resolution) + { + this.resolution = resolution; + return this; + } + + public ApproximateHistogramErrorBenchmark setCombinedResolution(int combinedResolution) + { + this.combinedResolution = combinedResolution; + return this; + } + + + public static void main(String[] args) + { + ApproximateHistogramErrorBenchmark approxHist = new ApproximateHistogramErrorBenchmark(); + System.out.println( + Arrays.toString( + approxHist.setDebug(true) + .setNumPerHist(50) + .setNumHists(10000) + .setResolution(50) + .setCombinedResolution(100) + .getErrors() + ) + ); + + + ApproximateHistogramErrorBenchmark approxHist2 = new ApproximateHistogramErrorBenchmark(); + int[] numHistsArray = new int[]{10, 100, 1000, 10000, 100000}; + float[] errs1 = new float[numHistsArray.length]; + float[] errs2 = new float[numHistsArray.length]; + for (int i = 0; i < numHistsArray.length; ++i) { + float[] tmp = approxHist2.setDebug(false).setNumHists(numHistsArray[i]).setCombinedResolution(100).getErrors(); + errs1[i] = tmp[0]; + errs2[i] = tmp[1]; + } + + System.out + .format("Number of histograms for folding : %s \n", Arrays.toString(numHistsArray)); + System.out.format("Errors for approximate histogram : %s \n", Arrays.toString(errs1)); + System.out.format("Errors for approximate histogram, ruleFold : %s \n", Arrays.toString(errs2)); + } + + private float[] getErrors() + { + final int numValues = numHists * numPerHist; + final float[] values = new float[numValues]; + + for (int i = 0; i < numValues; ++i) { + values[i] = (float) rand.nextGaussian(); + } + + float min = Floats.min(values); + min = (float) (min < 0 ? 1.02 : .98) * min; + float max = Floats.max(values); + max = (float) (max < 0 ? .98 : 1.02) * max; + final float stride = (max - min) / numBuckets; + final float[] breaks = new float[numBreaks]; + for (int i = 0; i < numBreaks; i++) { + breaks[i] = min + stride * i; + } + + Histogram h = new Histogram(breaks); + for (float v : values) { + h.offer(v); + } + double[] hcounts = h.asVisual().counts; + + ApproximateHistogram ah1 = new ApproximateHistogram(resolution); + ApproximateHistogram ah2 = new ApproximateHistogram(combinedResolution); + ApproximateHistogram tmp = new ApproximateHistogram(resolution); + for (int i = 0; i < numValues; ++i) { + tmp.offer(values[i]); + if ((i + 1) % numPerHist == 0) { + ah1.fold(tmp); + ah2.foldRule(tmp, null, null); + tmp = new ApproximateHistogram(resolution); + } + } + double[] ahcounts1 = ah1.toHistogram(breaks).getCounts(); + double[] ahcounts2 = ah2.toHistogram(breaks).getCounts(); + + float err1 = 0; + float err2 = 0; + for (int j = 0; j < hcounts.length; j++) { + err1 += Math.abs((hcounts[j] - ahcounts1[j]) / numValues); + err2 += Math.abs((hcounts[j] - ahcounts2[j]) / numValues); + } + + if (debug) { + float sum = 0; + for (double v : hcounts) { + sum += v; + } + System.out.println("Exact Histogram Sum:"); + System.out.println(sum); + sum = 0; + for (double v : ahcounts1) { + sum += v; + } + System.out.println("Approximate Histogram Sum:"); + System.out.println(sum); + sum = 0; + for (double v : ahcounts2) { + sum += v; + } + System.out.println("Approximate Histogram Rule Fold Sum:"); + System.out.println(sum); + System.out.println("Exact Histogram:"); + System.out.println(h.asVisual()); + System.out.println("Approximate Histogram:"); + System.out.println(ah1.toHistogram(breaks)); + System.out.println("Approximate Histogram Rule Fold:"); + System.out.println(ah2.toHistogram(breaks)); + System.out.format("Error for approximate histogram: %s \n", err1); + System.out.format("Error for approximate histogram, ruleFold: %s \n", err2); + System.out.format("Error ratio for AHRF: %s \n", err2 / err1); + } + return new float[]{err1, err2, err2 / err1}; + } + +} diff --git a/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramPostAggregatorTest.java b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramPostAggregatorTest.java new file mode 100644 index 00000000000..f8484494b2d --- /dev/null +++ b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramPostAggregatorTest.java @@ -0,0 +1,65 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import io.druid.query.aggregation.TestFloatColumnSelector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class ApproximateHistogramPostAggregatorTest +{ + static final float[] VALUES = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + + protected ApproximateHistogram buildHistogram(int size, float[] values) + { + ApproximateHistogram h = new ApproximateHistogram(size); + for (float v : values) { + h.offer(v); + } + return h; + } + + @Test + public void testCompute() + { + ApproximateHistogram ah = buildHistogram(10, VALUES); + final TestFloatColumnSelector selector = new TestFloatColumnSelector(VALUES); + + ApproximateHistogramAggregator agg = new ApproximateHistogramAggregator("price", selector, 10, Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY); + for (int i = 0; i < VALUES.length; i++) { + agg.aggregate(); + selector.increment(); + } + + Map metricValues = new HashMap(); + metricValues.put(agg.getName(), agg.get()); + + ApproximateHistogramPostAggregator approximateHistogramPostAggregator = new EqualBucketsPostAggregator( + "approxHist", + "price", + 5 + ); + Assert.assertEquals(ah.toHistogram(5), approximateHistogramPostAggregator.compute(metricValues)); + } + +} diff --git a/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramQueryTest.java b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramQueryTest.java new file mode 100644 index 00000000000..dd4406c5076 --- /dev/null +++ b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramQueryTest.java @@ -0,0 +1,247 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.druid.collections.StupidPool; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.TestQueryRunners; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.MaxAggregatorFactory; +import io.druid.query.aggregation.MinAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.topn.TopNQuery; +import io.druid.query.topn.TopNQueryBuilder; +import io.druid.query.topn.TopNQueryConfig; +import io.druid.query.topn.TopNQueryQueryToolChest; +import io.druid.query.topn.TopNQueryRunnerFactory; +import io.druid.query.topn.TopNResultValue; +import io.druid.segment.TestHelper; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class ApproximateHistogramQueryTest +{ + + private final QueryRunner runner; + + public ApproximateHistogramQueryTest( + QueryRunner runner + ) + { + this.runner = runner; + } + + @Parameterized.Parameters + public static Collection constructorFeeder() throws IOException + { + List retVal = Lists.newArrayList(); + retVal.addAll( + QueryRunnerTestHelper.makeQueryRunners( + new TopNQueryRunnerFactory( + TestQueryRunners.getPool(), + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ) + ); + retVal.addAll( + QueryRunnerTestHelper.makeQueryRunners( + new TopNQueryRunnerFactory( + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(2000); + } + } + ), + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ) + ); + + return retVal; + } + + @Test + public void testTopNWithApproximateHistogramAgg() + { + ApproximateHistogramAggregatorFactory factory = new ApproximateHistogramAggregatorFactory( + "apphisto", + "index", + 10, + 5, + Float.NEGATIVE_INFINITY, + Float.POSITIVE_INFINITY + ); + + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.providerDimension) + .metric(QueryRunnerTestHelper.dependentPostAggMetric) + .threshold(4) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + QueryRunnerTestHelper.commonAggregators, + Lists.newArrayList( + new MaxAggregatorFactory("maxIndex", "index"), + new MinAggregatorFactory("minIndex", "index"), + factory + ) + ) + ) + ) + .postAggregators( + Arrays.asList( + QueryRunnerTestHelper.addRowsIndexConstant, + QueryRunnerTestHelper.dependentPostAgg, + new QuantilePostAggregator("quantile", "apphisto", 0.5f) + ) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "total_market") + .put("rows", 186L) + .put("index", 215679.82879638672D) + .put("addRowsIndexConstant", 215866.82879638672D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 216053.82879638672D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .put("quantile", 1085.6775f) + .put( + "apphisto", + new Histogram( + new float[]{ + 554.4271240234375f, + 792.3260498046875f, + 1030.2249755859375f, + 1268.1239013671875f, + 1506.0228271484375f, + 1743.9217529296875f + }, + new double[]{ + 0.0D, + 39.42073059082031D, + 103.29110717773438D, + 34.93659591674805D, + 8.351564407348633D + } + ) + ) + .build(), + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "upfront") + .put("rows", 186L) + .put("index", 192046.1060180664D) + .put("addRowsIndexConstant", 192233.1060180664D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 192420.1060180664D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .put("quantile", 880.9881f) + .put( + "apphisto", + new Histogram( + new float[]{ + 214.97299194335938f, + 545.9906005859375f, + 877.0081787109375f, + 1208.0257568359375f, + 1539.0433349609375f, + 1870.06103515625f + }, + new double[]{ + 0.0D, + 67.53287506103516D, + 72.22068786621094D, + 31.984678268432617D, + 14.261756896972656D + } + ) + ) + .build(), + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "spot") + .put("rows", 837L) + .put("index", 95606.57232284546D) + .put("addRowsIndexConstant", 96444.57232284546D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 97282.57232284546D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .put("quantile", 101.78856f) + .put( + "apphisto", + new Histogram( + new float[]{ + 4.457897186279297f, + 59.02102279663086f, + 113.58415222167969f, + 168.14727783203125f, + 222.7104034423828f, + 277.2735290527344f + }, + new double[]{ + 0.0D, + 462.4309997558594D, + 357.5404968261719D, + 15.022850036621094D, + 2.0056631565093994D + } + ) + ) + .build() + ) + ) + ) + ); + + TestHelper.assertExpectedResults(expectedResults, runner.run(query)); + } +} diff --git a/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTest.java b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTest.java new file mode 100644 index 00000000000..a0d8bda7ec5 --- /dev/null +++ b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTest.java @@ -0,0 +1,588 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +public class ApproximateHistogramTest +{ + static final float[] VALUES = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45}; + static final float[] VALUES2 = {23, 19, 10, 16, 36, 2, 1, 9, 32, 30, 45, 46}; + + static final float[] VALUES3 = { + 20, 16, 19, 27, 17, 20, 18, 20, 28, 14, 17, 21, 20, 21, 10, 25, 23, 17, 21, 18, + 14, 20, 18, 12, 19, 20, 23, 25, 15, 22, 14, 17, 15, 23, 23, 15, 27, 20, 17, 15 + }; + static final float[] VALUES4 = { + 27.489f, 3.085f, 3.722f, 66.875f, 30.998f, -8.193f, 5.395f, 5.109f, 10.944f, 54.75f, + 14.092f, 15.604f, 52.856f, 66.034f, 22.004f, -14.682f, -50.985f, 2.872f, 61.013f, + -21.766f, 19.172f, 62.882f, 33.537f, 21.081f, 67.115f, 44.789f, 64.1f, 20.911f, + -6.553f, 2.178f + }; + static final float[] VALUES5 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + static final float[] VALUES6 = {1f, 1.5f, 2f, 2.5f, 3f, 3.5f, 4f, 4.5f, 5f, 5.5f, 6f, 6.5f, 7f, 7.5f, 8f, 8.5f, 9f, 9.5f, 10f}; + + protected ApproximateHistogram buildHistogram(int size, float[] values) + { + ApproximateHistogram h = new ApproximateHistogram(size); + for (float v : values) { + h.offer(v); + } + return h; + } + + protected ApproximateHistogram buildHistogram(int size, float[] values, float lowerLimit, float upperLimit) + { + ApproximateHistogram h = new ApproximateHistogram(size, lowerLimit, upperLimit); + for (float v : values) { + h.offer(v); + } + return h; + } + + @Test + public void testOffer() throws Exception + { + ApproximateHistogram h = buildHistogram(5, VALUES); + + // (2, 1), (9.5, 2), (19.33, 3), (32.67, 3), (45, 1) + Assert.assertArrayEquals( + "final bin positions match expected positions", + new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions(), 0.1f + ); + + Assert.assertArrayEquals( + "final bin positions match expected positions", + new long[]{1, 2, 3, 3, 1}, h.bins() + ); + + Assert.assertEquals("min value matches expexted min", 2, h.min(), 0); + Assert.assertEquals("max value matches expexted max", 45, h.max(), 0); + + Assert.assertEquals("bin count matches expected bin count", 5, h.binCount()); + } + + @Test + public void testFold() + { + ApproximateHistogram merged = new ApproximateHistogram(0); + ApproximateHistogram mergedFast = new ApproximateHistogram(0); + ApproximateHistogram h1 = new ApproximateHistogram(5); + ApproximateHistogram h2 = new ApproximateHistogram(10); + + for (int i = 0; i < 5; ++i) { + h1.offer(VALUES[i]); + } + for (int i = 5; i < VALUES.length; ++i) { + h2.offer(VALUES[i]); + } + + merged.fold(h1); + merged.fold(h2); + mergedFast.foldFast(h1); + mergedFast.foldFast(h2); + + Assert.assertArrayEquals( + "final bin positions match expected positions", + new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, merged.positions(), 0.1f + ); + Assert.assertArrayEquals( + "final bin positions match expected positions", + new float[]{11.2f, 30.25f, 45f}, mergedFast.positions(), 0.1f + ); + + Assert.assertArrayEquals( + "final bin counts match expected counts", + new long[]{1, 2, 3, 3, 1}, merged.bins() + ); + Assert.assertArrayEquals( + "final bin counts match expected counts", + new long[]{5, 4, 1}, mergedFast.bins() + ); + + Assert.assertEquals("merged max matches expected value", 45f, merged.max(), 0.1f); + Assert.assertEquals("mergedfast max matches expected value", 45f, mergedFast.max(), 0.1f); + Assert.assertEquals("merged min matches expected value", 2f, merged.min(), 0.1f); + Assert.assertEquals("mergedfast min matches expected value", 2f, mergedFast.min(), 0.1f); + + // fold where merged bincount is less than total bincount + ApproximateHistogram a = buildHistogram(10, new float[]{1, 2, 3, 4, 5, 6}); + ApproximateHistogram aFast = buildHistogram(10, new float[]{1, 2, 3, 4, 5, 6}); + ApproximateHistogram b = buildHistogram(5, new float[]{3, 4, 5, 6}); + + a.fold(b); + aFast.foldFast(b); + + Assert.assertEquals( + new ApproximateHistogram( + 6, + new float[]{1, 2, 3, 4, 5, 6, 0, 0, 0, 0}, + new long[]{1, 1, 2, 2, 2, 2, 0, 0, 0, 0}, + 1, 6 + ), a + ); + Assert.assertEquals( + new ApproximateHistogram( + 6, + new float[]{1, 2, 3, 4, 5, 6, 0, 0, 0, 0}, + new long[]{1, 1, 2, 2, 2, 2, 0, 0, 0, 0}, + 1, 6 + ), aFast + ); + + ApproximateHistogram h3 = new ApproximateHistogram(10); + ApproximateHistogram h4 = new ApproximateHistogram(10); + for (float v : VALUES3) { + h3.offer(v); + } + for (float v : VALUES4) { + h4.offer(v); + } + h3.fold(h4); + Assert.assertArrayEquals( + "final bin positions match expected positions", + new float[]{-50.98f, -21.77f, -9.81f, 3.73f, 13.72f, 20.1f, 29f, 44.79f, 53.8f, 64.67f}, + h3.positions(), 0.1f + ); + Assert.assertArrayEquals( + "final bin counts match expected counts", + new long[]{1, 1, 3, 6, 12, 32, 6, 1, 2, 6}, h3.bins() + ); + + } + + @Test + public void testFoldNothing() throws Exception + { + ApproximateHistogram h1 = new ApproximateHistogram(10); + ApproximateHistogram h2 = new ApproximateHistogram(10); + + h1.fold(h2); + h1.foldFast(h2); + } + + @Test + public void testFoldNothing2() throws Exception + { + ApproximateHistogram h1 = new ApproximateHistogram(10); + ApproximateHistogram h1Fast = new ApproximateHistogram(10); + ApproximateHistogram h2 = new ApproximateHistogram(10); + ApproximateHistogram h3 = new ApproximateHistogram(10); + ApproximateHistogram h4 = new ApproximateHistogram(10); + ApproximateHistogram h4Fast = new ApproximateHistogram(10); + for (float v : VALUES3) { + h3.offer(v); + h4.offer(v); + h4Fast.offer(v); + } + + h1.fold(h3); + h4.fold(h2); + h1Fast.foldFast(h3); + h4Fast.foldFast(h2); + + Assert.assertEquals(h3, h1); + Assert.assertEquals(h4, h3); + Assert.assertEquals(h3, h1Fast); + Assert.assertEquals(h3, h4Fast); + } + + //@Test + public void testFoldSpeed() + { + final int combinedHistSize = 200; + final int histSize = 50; + final int numRand = 10000; + ApproximateHistogram h = new ApproximateHistogram(combinedHistSize); + Random rand = new Random(0); + //for(int i = 0; i < 200; ++i) h.offer((float)(rand.nextGaussian() * 50.0)); + long tFold = 0; + int count = 5000000; + Float[] randNums = new Float[numRand]; + for (int i = 0; i < numRand; i++) { + randNums[i] = (float) rand.nextGaussian(); + } + + List randHist = Lists.newLinkedList(); + Iterator it = Iterators.cycle(randHist); + + for(int k = 0; k < numRand; ++k) { + ApproximateHistogram tmp = new ApproximateHistogram(histSize); + for (int i = 0; i < 20; ++i) { + tmp.offer((float) (rand.nextGaussian() + (double)k)); + } + randHist.add(tmp); + } + + float[] mergeBufferP = new float[combinedHistSize * 2]; + long[] mergeBufferB = new long[combinedHistSize * 2]; + float[] mergeBufferD = new float[combinedHistSize * 2]; + + for (int i = 0; i < count; ++i) { + ApproximateHistogram tmp = it.next(); + + long t0 = System.nanoTime(); + //h.fold(tmp, mergeBufferP, mergeBufferB, mergeBufferD); + h.foldFast(tmp, mergeBufferP, mergeBufferB); + tFold += System.nanoTime() - t0; + } + + System.out.println(String.format("Average folds per second : %f", (double) count / (double) tFold * 1e9)); + } + + @Test + public void testSum() + { + ApproximateHistogram h = buildHistogram(5, VALUES); + + Assert.assertEquals(0.0f, h.sum(0), 0.01); + Assert.assertEquals(1.0f, h.sum(2), 0.01); + Assert.assertEquals(1.16f, h.sum(5), 0.01); + Assert.assertEquals(3.28f, h.sum(15), 0.01); + Assert.assertEquals(VALUES.length, h.sum(45), 0.01); + Assert.assertEquals(VALUES.length, h.sum(46), 0.01); + + ApproximateHistogram h2 = buildHistogram(5, VALUES2); + + Assert.assertEquals(0.0f, h2.sum(0), 0.01); + Assert.assertEquals(0.0f, h2.sum(1f), 0.01); + Assert.assertEquals(1.0f, h2.sum(1.5f), 0.01); + Assert.assertEquals(1.125f, h2.sum(2f), 0.001); + Assert.assertEquals(2.0625f, h2.sum(5.75f), 0.001); + Assert.assertEquals(3.0f, h2.sum(9.5f), 0.01); + Assert.assertEquals(11.0f, h2.sum(45.5f), 0.01); + Assert.assertEquals(12.0f, h2.sum(46f), 0.01); + Assert.assertEquals(12.0f, h2.sum(47f), 0.01); + } + + @Test + public void testSerializeCompact() + { + ApproximateHistogram h = buildHistogram(5, VALUES); + Assert.assertEquals(h, ApproximateHistogram.fromBytes(h.toBytes())); + + ApproximateHistogram h2 = new ApproximateHistogram(50).fold(h); + Assert.assertEquals(h2, ApproximateHistogram.fromBytes(h2.toBytes())); + } + + @Test + public void testSerializeDense() + { + ApproximateHistogram h = buildHistogram(5, VALUES); + ByteBuffer buf = ByteBuffer.allocate(h.getDenseStorageSize()); + h.toBytesDense(buf); + Assert.assertEquals(h, ApproximateHistogram.fromBytes(buf.array())); + } + + @Test + public void testSerializeSparse() + { + ApproximateHistogram h = buildHistogram(5, VALUES); + ByteBuffer buf = ByteBuffer.allocate(h.getSparseStorageSize()); + h.toBytesSparse(buf); + Assert.assertEquals(h, ApproximateHistogram.fromBytes(buf.array())); + } + + @Test + public void testSerializeCompactExact() + { + ApproximateHistogram h = buildHistogram(50, new float[]{1f, 2f, 3f, 4f, 5f}); + Assert.assertEquals(h, ApproximateHistogram.fromBytes(h.toBytes())); + + h = buildHistogram(5, new float[]{1f, 2f, 3f}); + Assert.assertEquals(h, ApproximateHistogram.fromBytes(h.toBytes())); + + h = new ApproximateHistogram(40).fold(h); + Assert.assertEquals(h, ApproximateHistogram.fromBytes(h.toBytes())); + } + + @Test + public void testSerializeEmpty() + { + ApproximateHistogram h = new ApproximateHistogram(50); + Assert.assertEquals(h, ApproximateHistogram.fromBytes(h.toBytes())); + } + + @Test + public void testQuantileSmaller() + { + ApproximateHistogram h = buildHistogram(20, VALUES5); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{5f}, + h.getQuantiles(new float[]{.5f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{3.33f, 6.67f}, + h.getQuantiles(new float[]{.333f, .666f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{2.5f, 5f, 7.5f}, + h.getQuantiles(new float[]{.25f, .5f, .75f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{2f, 4f, 6f, 8f}, + h.getQuantiles(new float[]{.2f, .4f, .6f, .8f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{1f, 2f, 3f, 4f, 5f, 6f, 7f, 8f, 9f}, + h.getQuantiles(new float[]{.1f, .2f, .3f, .4f, .5f, .6f, .7f, .8f, .9f}), 0.1f + ); + } + + @Test + public void testQuantileEqualSize() + { + ApproximateHistogram h = buildHistogram(10, VALUES5); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{5f}, + h.getQuantiles(new float[]{.5f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{3.33f, 6.67f}, + h.getQuantiles(new float[]{.333f, .666f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{2.5f, 5f, 7.5f}, + h.getQuantiles(new float[]{.25f, .5f, .75f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{2f, 4f, 6f, 8f}, + h.getQuantiles(new float[]{.2f, .4f, .6f, .8f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{1f, 2f, 3f, 4f, 5f, 6f, 7f, 8f, 9f}, + h.getQuantiles(new float[]{.1f, .2f, .3f, .4f, .5f, .6f, .7f, .8f, .9f}), 0.1f + ); + } + + @Test + public void testQuantileBigger() + { + ApproximateHistogram h = buildHistogram(5, VALUES5); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{4.5f}, + h.getQuantiles(new float[]{.5f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{2.83f, 6.17f}, + h.getQuantiles(new float[]{.333f, .666f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{2f, 4.5f, 7f}, + h.getQuantiles(new float[]{.25f, .5f, .75f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{1.5f, 3.5f, 5.5f, 7.5f}, + h.getQuantiles(new float[]{.2f, .4f, .6f, .8f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{1f, 1.5f, 2.5f, 3.5f, 4.5f, 5.5f, 6.5f, 7.5f, 8.5f}, + h.getQuantiles(new float[]{.1f, .2f, .3f, .4f, .5f, .6f, .7f, .8f, .9f}), 0.1f + ); + } + + @Test + public void testQuantileBigger2() + { + float[] thousand = new float[1000]; + for (int i = 1; i <= 1000; ++i) { + thousand[i - 1] = i; + } + ApproximateHistogram h = buildHistogram(100, thousand); + + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{493.5f}, + h.getQuantiles(new float[]{.5f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{327.5f, 662f}, + h.getQuantiles(new float[]{.333f, .666f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{244.5f, 493.5f, 746f}, + h.getQuantiles(new float[]{.25f, .5f, .75f}), 0.1f + ); + Assert.assertArrayEquals( + "expected quantiles match actual quantiles", + new float[]{96.5f, 196.53f, 294.5f, 395.5f, 493.5f, 597f, 696f, 795f, 895.25f}, + h.getQuantiles(new float[]{.1f, .2f, .3f, .4f, .5f, .6f, .7f, .8f, .9f}), 0.1f + ); + } + + @Test + public void testLimitSum() + { + final float lowerLimit = 0f; + final float upperLimit = 10f; + + ApproximateHistogram h = buildHistogram(15, VALUES6, lowerLimit, upperLimit); + + for (int i = 1; i <= 20; ++i) { + ApproximateHistogram hLow = new ApproximateHistogram(5); + ApproximateHistogram hHigh = new ApproximateHistogram(5); + hLow.offer(lowerLimit - i); + hHigh.offer(upperLimit + i); + h.foldFast(hLow); + h.foldFast(hHigh); + } + + Assert.assertEquals(20f, h.sum(lowerLimit), .7f); + Assert.assertEquals(VALUES6.length + 20f, h.sum(upperLimit), 0.01); + } + + @Test + public void testBuckets() + { + final float[] values = new float[]{-5f, .01f, .02f, .06f, .12f, 1f, 2f}; + ApproximateHistogram h = buildHistogram(50, values, 0f, 1f); + Histogram h2 = h.toHistogram(.05f, 0f); + + Assert.assertArrayEquals( + "expected counts match actual counts", + new double[]{1f, 2f, 1f, 1f, 0f, 1f, 1f}, + h2.getCounts(), 0.1f + ); + + Assert.assertArrayEquals( + "expected breaks match actual breaks", + new double[]{-5.05f, 0f, .05f, .1f, .15f, .95f, 1f, 2f}, + h2.getBreaks(), 0.1f + ); + } + + @Test + public void testBuckets2() + { + final float[] values = new float[]{-5f, .01f, .02f, .06f, .12f, .94f, 1f, 2f}; + ApproximateHistogram h = buildHistogram(50, values, 0f, 1f); + Histogram h2 = h.toHistogram(.05f, 0f); + + Assert.assertArrayEquals( + "expected counts match actual counts", + new double[]{1f, 2f, 1f, 1f, 0f, 1f, 1f, 1f}, + h2.getCounts(), 0.1f + ); + + Assert.assertArrayEquals( + "expected breaks match actual breaks", + new double[]{-5.05f, 0f, .05f, .1f, .15f, .9f, .95f, 1f, 2.05f}, + h2.getBreaks(), 0.1f + ); + } + + @Test + public void testBuckets3() + { + final float[] values = new float[]{0f, 0f, .02f, .06f, .12f, .94f}; + ApproximateHistogram h = buildHistogram(50, values, 0f, 1f); + Histogram h2 = h.toHistogram(1f, 0f); + + Assert.assertArrayEquals( + "expected counts match actual counts", + new double[]{2f, 4f}, + h2.getCounts(), 0.1f + ); + + Assert.assertArrayEquals( + "expected breaks match actual breaks", + new double[]{-1f, 0f, 1f}, + h2.getBreaks(), 0.1f + ); + } + + @Test + public void testBuckets4() + { + final float[] values = new float[]{0f, 0f, 0.01f, 0.51f, 0.6f,0.8f}; + ApproximateHistogram h = buildHistogram(50, values, 0.5f,1f); + Histogram h3 = h.toHistogram(0.2f,0); + + Assert.assertArrayEquals( + "Expected counts match actual counts", + new double[]{3f,2f,1f}, + h3.getCounts(), + 0.1f + ); + + Assert.assertArrayEquals( + "expected breaks match actual breaks", + new double[]{-0.2f,0.5f,0.7f,0.9f}, + h3.getBreaks(), 0.1f + ); + } + + @Test public void testBuckets5() + { + final float[] values = new float[]{0.1f,0.5f,0.6f}; + ApproximateHistogram h = buildHistogram(50, values, 0f,1f); + Histogram h4 = h.toHistogram(0.5f,0); + + Assert.assertArrayEquals( + "Expected counts match actual counts", + new double[]{2,1}, + h4.getCounts(), + 0.1f + ); + + Assert.assertArrayEquals( + "Expected breaks match actual breaks", + new double[]{0f,0.5f,1f}, + h4.getBreaks(), + 0.1f + ); + } + + @Test public void testEmptyHistogram() { + ApproximateHistogram h = new ApproximateHistogram(50); + Assert.assertArrayEquals( + new float[]{Float.NaN, Float.NaN}, + h.getQuantiles(new float[]{0.8f, 0.9f}), + 1e-9f + ); + } + + +} diff --git a/histogram/src/test/java/io/druid/query/aggregation/histogram/QuantilesTest.java b/histogram/src/test/java/io/druid/query/aggregation/histogram/QuantilesTest.java new file mode 100644 index 00000000000..14ec2eb34a7 --- /dev/null +++ b/histogram/src/test/java/io/druid/query/aggregation/histogram/QuantilesTest.java @@ -0,0 +1,81 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation.histogram; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.LinkedHashMap; + +public class QuantilesTest +{ + @Test + public void testSerialization() throws Exception + { + ObjectMapper mapper = new DefaultObjectMapper(); + + float[] probabilities = new float[]{0.25f, 0.5f, 0.75f}; + float[] quantiles = new float[]{0.25f, 0.5f, 0.75f}; + float min = 0f; + float max = 4f; + + String theString = mapper.writeValueAsString( + new Quantiles(probabilities, quantiles, min, max) + ); + + Object theObject = mapper.readValue(theString, Object.class); + Assert.assertThat(theObject, CoreMatchers.instanceOf(LinkedHashMap.class)); + + LinkedHashMap theMap = (LinkedHashMap) theObject; + + ArrayList theProbabilities = (ArrayList) theMap.get("probabilities"); + + Assert.assertEquals(probabilities.length, theProbabilities.size()); + for (int i = 0; i < theProbabilities.size(); ++i) { + Assert.assertEquals(probabilities[i], ((Number) theProbabilities.get(i)).floatValue(), 0.0001f); + } + + ArrayList theQuantiles = (ArrayList) theMap.get("quantiles"); + + Assert.assertEquals(quantiles.length, theQuantiles.size()); + for (int i = 0; i < theQuantiles.size(); ++i) { + Assert.assertEquals(quantiles[i], ((Number) theQuantiles.get(i)).floatValue(), 0.0001f); + } + + Assert.assertEquals( + "serialized min. matches expected min.", + min, + ((Number) theMap.get("min")).floatValue(), + 0.0001f + ); + Assert.assertEquals( + "serialized max. matches expected max.", + max, + ((Number) theMap.get("max")).floatValue(), + 0.0001f + ); + + + } +} diff --git a/indexing-hadoop/README b/indexing-hadoop/README deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/pom.xml b/pom.xml index 9580f7aac4f..2b7f88276e8 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,7 @@ kafka-seven kafka-eight rabbitmq + histogram