+ * ApproximateHistogram merged = new ApproximateHistogram(mergedBinCount, mergedPositions, mergedBins); + * + * int targetSize = merged.binCount() - numMerge; + * while(merged.binCount() > targetSize) { + * merged.merge(merged.minDeltaIndex()); + * } + *+ * + * @param mergedBinCount + * @param mergedPositions + * @param mergedBins + * @param deltas + * @param numMerge + * @param next + * @param prev + * + * @return the last valid index into the mergedPositions and mergedBins arrays + */ + private static void mergeBins( + int mergedBinCount, float[] mergedPositions, + long[] mergedBins, + float[] deltas, + int numMerge, + int[] next, + int[] prev + ) + { + // repeatedly search for two closest bins, merge them and update the corresponding deltas + + // maintain index to the last valid bin + int lastValidIndex = mergedBinCount - 1; + + // initialize prev / next lookup arrays + for (int i = 0; i < mergedBinCount; ++i) { + next[i] = i + 1; + } + for (int i = 0; i < mergedBinCount; ++i) { + prev[i] = i - 1; + } + + // initialize min-heap of deltas and the reverse index into the heap + int heapSize = mergedBinCount - 1; + int[] heap = new int[heapSize]; + int[] reverseIndex = new int[heapSize]; + for (int i = 0; i < heapSize; ++i) { + heap[i] = i; + } + for (int i = 0; i < heapSize; ++i) { + reverseIndex[i] = i; + } + + heapify(heap, reverseIndex, heapSize, deltas); + + { + int i = 0; + while (i < numMerge) { + // find the smallest delta within the range used for bins + + // pick minimum delta by scanning array + //int currentIndex = minIndex(deltas, lastValidIndex); + + // pick minimum delta index using min-heap + int currentIndex = heap[0]; + + final int nextIndex = next[currentIndex]; + final int prevIndex = prev[currentIndex]; + + final long k0 = mergedBins[currentIndex] & COUNT_BITS; + final long k1 = mergedBins[nextIndex] & COUNT_BITS; + final float m0 = mergedPositions[currentIndex]; + final float m1 = mergedPositions[nextIndex]; + final float d1 = deltas[nextIndex]; + + final long sum = k0 + k1; + final float w = (float) k0 / (float) sum; + + // merge bin at given position with the next bin + final float mm0 = (m0 - m1) * w + m1; + + mergedPositions[currentIndex] = mm0; + //mergedPositions[nextIndex] = Float.MAX_VALUE; // for debugging + + mergedBins[currentIndex] = sum | APPROX_FLAG_BIT; + //mergedBins[nextIndex] = -1; // for debugging + + // update deltas and min-heap + if (nextIndex == lastValidIndex) { + // merged bin is the last => remove the current bin delta from the heap + heapSize = heapDelete(heap, reverseIndex, heapSize, reverseIndex[currentIndex], deltas); + + //deltas[currentIndex] = Float.MAX_VALUE; // for debugging + } else { + // merged bin is not the last => remove the merged bin delta from the heap + heapSize = heapDelete(heap, reverseIndex, heapSize, reverseIndex[nextIndex], deltas); + + // updated current delta + deltas[currentIndex] = m1 - mm0 + d1; + + // updated delta is necessarily larger than existing one, therefore we only need to push it down the heap + siftDown(heap, reverseIndex, reverseIndex[currentIndex], heapSize - 1, deltas); + } + + if (prevIndex >= 0) { + // current bin is not the first, therefore update the previous bin delta + deltas[prevIndex] = mm0 - mergedPositions[prevIndex]; + + // updated previous bin delta is necessarily larger than its existing value => push down the heap + siftDown(heap, reverseIndex, reverseIndex[prevIndex], heapSize - 1, deltas); + } + + // mark the merged bin as invalid + // deltas[nextIndex] = Float.MAX_VALUE; // for debugging + + // update last valid index if we merged the last bin + if (nextIndex == lastValidIndex) { + lastValidIndex = currentIndex; + } + + next[currentIndex] = next[nextIndex]; + if (nextIndex < lastValidIndex) { + prev[next[nextIndex]] = currentIndex; + } + + ++i; + } + } + } + + /** + * Builds a min-heap and a reverseIndex into the heap from the given array of values + * + * @param heap min-heap stored as indices into the array of values + * @param reverseIndex reverse index from the array of values into the heap + * @param count current size of the heap + * @param values values to be stored in the heap + */ + private static void heapify(int[] heap, int[] reverseIndex, int count, float[] values) + { + int start = (count - 2) / 2; + while (start >= 0) { + siftDown(heap, reverseIndex, start, count - 1, values); + start--; + } + } + + /** + * Rebalances the min-heap by pushing values from the top down and simultaneously updating the reverse index + * + * @param heap min-heap stored as indices into the array of values + * @param reverseIndex reverse index from the array of values into the heap + * @param start index to start re-balancing from + * @param end index to stop re-balancing at + * @param values values stored in the heap + */ + private static void siftDown(int[] heap, int[] reverseIndex, int start, int end, float[] values) + { + int root = start; + while (root * 2 + 1 <= end) { + int child = root * 2 + 1; + int swap = root; + if (values[heap[swap]] > values[heap[child]]) { + swap = child; + } + if (child + 1 <= end && values[heap[swap]] > values[heap[child + 1]]) { + swap = child + 1; + } + if (swap != root) { + // swap + int tmp = heap[swap]; + heap[swap] = heap[root]; + heap[root] = tmp; + + // heap index from delta index + reverseIndex[heap[swap]] = swap; + reverseIndex[heap[root]] = root; + + root = swap; + } else { + return; + } + } + } + + /** + * Deletes an item from the min-heap and updates the reverse index + * + * @param heap min-heap stored as indices into the array of values + * @param reverseIndex reverse index from the array of values into the heap + * @param count current size of the heap + * @param heapIndex index of the item to be deleted + * @param values values stored in the heap + * + * @return + */ + private static int heapDelete(int[] heap, int[] reverseIndex, int count, int heapIndex, float[] values) + { + int end = count - 1; + + reverseIndex[heap[heapIndex]] = -1; + + heap[heapIndex] = heap[end]; + reverseIndex[heap[heapIndex]] = heapIndex; + + end--; + siftDown(heap, reverseIndex, heapIndex, end, values); + return count - 1; + } + + private static int minIndex(float[] deltas, int lastValidIndex) + { + int minIndex = -1; + float min = Float.MAX_VALUE; + for (int k = 0; k < lastValidIndex; ++k) { + float value = deltas[k]; + if (value < min) { + minIndex = k; + min = value; + } + } + return minIndex; + } + + /** + * Combines two sets of histogram bins using merge-sort and computes the delta between consecutive bin positions. + * Duplicate bins are merged together. + * + * @param leftBinCount + * @param leftPositions + * @param leftBins + * @param rightBinCount + * @param rightPositions + * @param rightBins + * @param mergedPositions array to store the combined bin positions (size must be at least leftBinCount + rightBinCount) + * @param mergedBins array to store the combined bin counts (size must be at least leftBinCount + rightBinCount) + * @param deltas deltas between consecutive bin positions in the merged bins (size must be at least leftBinCount + rightBinCount) + * + * @return the number of combined bins + */ + private static int combineBins( + int leftBinCount, float[] leftPositions, long[] leftBins, + int rightBinCount, float[] rightPositions, long[] rightBins, + float[] mergedPositions, long[] mergedBins, float[] deltas + ) + { + int i = 0; + int j = 0; + int k = 0; + while (j < leftBinCount || k < rightBinCount) { + if (j < leftBinCount && (k == rightBinCount || leftPositions[j] < rightPositions[k])) { + mergedPositions[i] = leftPositions[j]; + mergedBins[i] = leftBins[j]; + ++j; + } else if (k < rightBinCount && (j == leftBinCount || leftPositions[j] > rightPositions[k])) { + mergedPositions[i] = rightPositions[k]; + mergedBins[i] = rightBins[k]; + ++k; + } else { + // combine overlapping bins + mergedPositions[i] = leftPositions[j]; + mergedBins[i] = leftBins[j] + rightBins[k]; + ++j; + ++k; + } + if (deltas != null && i > 0) { + deltas[i - 1] = mergedPositions[i] - mergedPositions[i - 1]; + } + ++i; + } + return i; + } + + /** + * Returns a byte-array representation of this ApproximateHistogram object + * + * @return + */ + @JsonValue + public byte[] toBytes() + { + ByteBuffer buf = ByteBuffer.allocate(getMinStorageSize()); + toBytes(buf); + return buf.array(); + } + + + public int getDenseStorageSize() + { + return Ints.BYTES * 2 + Floats.BYTES * size + Longs.BYTES * size + Floats.BYTES * 2; + } + + public int getSparseStorageSize() + { + return Ints.BYTES * 2 + Floats.BYTES * binCount + Longs.BYTES * binCount + Floats.BYTES * 2; + } + + public int getCompactStorageSize() + { + // ensures exactCount and (count - exactCount) can safely be cast to (int) + Preconditions.checkState(canStoreCompact(), "Approximate histogram cannot be stored in compact form"); + + final long exactCount = getExactCount(); + if (exactCount == count) { + return Shorts.BYTES + 1 + Floats.BYTES * (int) exactCount; + } else { + return Shorts.BYTES + + 1 + + Floats.BYTES * (int) exactCount + + 1 + + Floats.BYTES * (int) (count - exactCount) + + Floats.BYTES * 2; + } + } + + public int getMaxStorageSize() + { + return getDenseStorageSize(); + } + + /** + * Returns the minimum number of bytes required to store this ApproximateHistogram object + * + * @return required number of bytes + */ + public int getMinStorageSize() + { + // sparse is always small than dense, so no need to check + if (canStoreCompact() && getCompactStorageSize() < getSparseStorageSize()) { + return getCompactStorageSize(); + } else { + return getSparseStorageSize(); + } + } + + /** + * Checks whether this approximate histogram can be stored in a compact form + * + * @return true if yes, false otherwise + */ + public boolean canStoreCompact() + { + final long exactCount = getExactCount(); + return ( + size <= Short.MAX_VALUE + && exactCount <= Byte.MAX_VALUE + && (count - exactCount) <= Byte.MAX_VALUE + ); + } + + /** + * Writes the representation of this ApproximateHistogram object to the given byte-buffer + * + * @param buf + */ + public void toBytes(ByteBuffer buf) + { + if (canStoreCompact() && getCompactStorageSize() < getSparseStorageSize()) { + // store compact + toBytesCompact(buf); + } else { + // store sparse + toBytesSparse(buf); + } + } + + /** + * Writes the dense representation of this ApproximateHistogram object to the given byte-buffer + * + * Requires 16 + 12 * size bytes of storage + * + * @param buf + */ + public void toBytesDense(ByteBuffer buf) + { + buf.putInt(size); + buf.putInt(binCount); + + buf.asFloatBuffer().put(positions); + buf.position(buf.position() + Floats.BYTES * positions.length); + buf.asLongBuffer().put(bins); + buf.position(buf.position() + Longs.BYTES * bins.length); + + buf.putFloat(min); + buf.putFloat(max); + } + + /** + * Writes the sparse representation of this ApproximateHistogram object to the given byte-buffer + * + * Requires 16 + 12 * binCount bytes of storage + * + * @param buf ByteBuffer to write object to + */ + public void toBytesSparse(ByteBuffer buf) + { + buf.putInt(size); + buf.putInt(-1 * binCount); // use negative binCount to indicate sparse storage + for (int i = 0; i < binCount; ++i) { + buf.putFloat(positions[i]); + } + for (int i = 0; i < binCount; ++i) { + buf.putLong(bins[i]); + } + buf.putFloat(min); + buf.putFloat(max); + } + + /** + * Returns a compact byte-buffer representation of this ApproximateHistogram object + * storing actual values as opposed to histogram bins + * + * Requires 3 + 4 * count bytes of storage with count <= 127 + * + * @param buf + */ + public void toBytesCompact(ByteBuffer buf) + { + Preconditions.checkState(canStoreCompact(), "Approximate histogram cannot be stored in compact form"); + + buf.putShort((short) (-1 * size)); // use negative size to indicate compact storage + + final long exactCount = getExactCount(); + if (exactCount != count) { + // use negative count to indicate approximate bins + buf.put((byte) (-1 * (count - exactCount))); + + // store actual values instead of bins + for (int i = 0; i < binCount; ++i) { + // repeat each value bins[i] times for approximate bins + if ((bins[i] & APPROX_FLAG_BIT) != 0) { + for (int k = 0; k < (bins[i] & COUNT_BITS); ++k) { + buf.putFloat(positions[i]); + } + } + } + + // tack on min and max since they may be lost int the approximate bins + buf.putFloat(min); + buf.putFloat(max); + } + + buf.put((byte) exactCount); + // store actual values instead of bins + for (int i = 0; i < binCount; ++i) { + // repeat each value bins[i] times for exact bins + if ((bins[i] & APPROX_FLAG_BIT) == 0) { + for (int k = 0; k < (bins[i] & COUNT_BITS); ++k) { + buf.putFloat(positions[i]); + } + } + } + } + + /** + * Constructs an Approximate Histogram object from the given byte-array representation + * + * @param bytes + * + * @return + */ + public static ApproximateHistogram fromBytes(byte[] bytes) + { + ByteBuffer buf = ByteBuffer.wrap(bytes); + return fromBytes(buf); + } + + /** + * Constructs an ApproximateHistogram object from the given dense byte-buffer representation + * + * @param buf + * + * @return + */ + public static ApproximateHistogram fromBytesDense(ByteBuffer buf) + { + int size = buf.getInt(); + int binCount = buf.getInt(); + + float[] positions = new float[size]; + long[] bins = new long[size]; + + buf.asFloatBuffer().get(positions); + buf.position(buf.position() + Floats.BYTES * positions.length); + buf.asLongBuffer().get(bins); + buf.position(buf.position() + Longs.BYTES * bins.length); + + float min = buf.getFloat(); + float max = buf.getFloat(); + + return new ApproximateHistogram(binCount, positions, bins, min, max); + } + + /** + * Constructs an ApproximateHistogram object from the given dense byte-buffer representation + * + * @param buf + * + * @return + */ + public static ApproximateHistogram fromBytesSparse(ByteBuffer buf) + { + int size = buf.getInt(); + int binCount = -1 * buf.getInt(); + + float[] positions = new float[size]; + long[] bins = new long[size]; + + for (int i = 0; i < binCount; ++i) { + positions[i] = buf.getFloat(); + } + for (int i = 0; i < binCount; ++i) { + bins[i] = buf.getLong(); + } + + float min = buf.getFloat(); + float max = buf.getFloat(); + + return new ApproximateHistogram(binCount, positions, bins, min, max); + } + + /** + * Constructs an ApproximateHistogram object from the given compact byte-buffer representation + * + * @param buf + * + * @return + */ + public static ApproximateHistogram fromBytesCompact(ByteBuffer buf) + { + short size = (short) (-1 * buf.getShort()); + byte count = buf.get(); + + if (count >= 0) { + // only exact bins + ApproximateHistogram histogram = new ApproximateHistogram(size); + for (int i = 0; i < count; ++i) { + histogram.offer(buf.getFloat()); + } + return histogram; + } else { + byte approxCount = (byte) (-1 * count); + + Map