Use t-digest as a dependency.

Our improvements to t-digest have been pushed upstream and t-digest also got
some additional nice improvements around memory usage and speedups of quantile
estimation. So it makes sense to use it as a dependency now.

This also allows to remove the test dependency on Apache Mahout.

Close #6142
This commit is contained in:
Adrien Grand 2014-05-13 09:11:12 +02:00
parent 3aac594503
commit cc530b9037
10 changed files with 19 additions and 2268 deletions

View File

@ -184,7 +184,7 @@ This balance can be controlled using a `compression` parameter:
The TDigest algorithm uses a number of "nodes" to approximate percentiles -- the
more nodes available, the higher the accuracy (and large memory footprint) proportional
to the volume of data. The `compression` parameter limits the maximum number of
nodes to `100 * compression`.
nodes to `20 * compression`.
Therefore, by increasing the compression value, you can increase the accuracy of
your percentiles at the cost of more memory. Larger compression values also
@ -192,7 +192,7 @@ make the algorithm slower since the underlying tree data structure grows in size
resulting in more expensive operations. The default compression value is
`100`.
A "node" uses roughly 48 bytes of memory, so under worst-case scenarios (large amount
A "node" uses roughly 32 bytes of memory, so under worst-case scenarios (large amount
of data which arrives sorted and in-order) the default settings will produce a
TDigest roughly 480KB in size. In practice data tends to be more random and
TDigest roughly 64KB in size. In practice data tends to be more random and
the TDigest will use less memory.

View File

@ -75,10 +75,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>0.9</version>
<scope>test</scope>
<groupId>com.tdunning</groupId>
<artifactId>t-digest</artifactId>
<version>3.0</version>
</dependency>
<dependency>

View File

@ -1,259 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics.percentiles.tdigest;
import com.google.common.primitives.Longs;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
import java.util.Arrays;
/** Specialized {@link RedBlackTree} where each node stores aggregated data for its sub-trees. */
public class GroupRedBlackTree extends RedBlackTree {
private long nextId;
private double[] centroids;
private long[] counts;
private int[] aggregatedSizes;
private long[] aggregatedCounts;
private long[] ids;
// used for comparisons
double tmpCentroid;
long tmpCount;
long tmpId;
public GroupRedBlackTree(int capacity) {
super(capacity);
nextId = 1;
centroids = new double[1+capacity];
aggregatedSizes = new int[1+capacity];
counts = new long[1+capacity];
aggregatedCounts = new long[1+capacity];
ids = new long[1+capacity];
}
// Internal node management
@Override
protected int compare(int node) {
final double centroid = mean(node);
int cmp = Double.compare(tmpCentroid, centroid);
if (cmp == 0) {
cmp = Long.compare(tmpId, ids[node]);
}
return cmp;
}
@Override
protected void copy(int node) {
centroids[node] = tmpCentroid;
counts[node] = tmpCount;
ids[node] = tmpId;
}
@Override
protected void merge(int node) {
throw new IllegalStateException("compare should never return 0");
}
@Override
protected void swap(int node1, int node2) {
super.swap(node1, node2);
for (int n = node1; n != NIL; n = parent(n)) {
fixAggregates(n);
}
}
@Override
protected int newNode() {
final int newNode = super.newNode();
if (newNode >= centroids.length) {
final int minSize = ArrayUtil.oversize(newNode + 1, RamUsageEstimator.NUM_BYTES_INT);
centroids = Arrays.copyOf(centroids, minSize);
counts = Arrays.copyOf(counts, minSize);
aggregatedSizes = Arrays.copyOf(aggregatedSizes, minSize);
aggregatedCounts = Arrays.copyOf(aggregatedCounts, minSize);
ids = Arrays.copyOf(ids, minSize);
}
return newNode;
}
private void fixAggregates(int node) {
final int left = left(node), right = right(node);
aggregatedCounts[node] = counts[node] + aggregatedCounts[left] + aggregatedCounts[right];
aggregatedSizes[node] = 1 + aggregatedSizes[left] + aggregatedSizes[right];
}
private void fixCounts(int node) {
final int left = left(node), right = right(node);
aggregatedCounts[node] = counts[node] + aggregatedCounts[left] + aggregatedCounts[right];
}
@Override
protected void rotateLeft(int node) {
super.rotateLeft(node);
fixAggregates(node);
fixAggregates(parent(node));
}
@Override
protected void rotateRight(int node) {
super.rotateRight(node);
fixAggregates(node);
fixAggregates(parent(node));
}
@Override
protected void beforeRemoval(int node) {
final long count = count(node);
for (int n = node; n != NIL; n = parent(n)) {
aggregatedCounts[n] -= count;
aggregatedSizes[n]--;
}
super.beforeRemoval(node);
}
@Override
protected void afterInsertion(int node) {
final long count = count(node);
aggregatedCounts[node] = count;
aggregatedSizes[node] = 1;
for (int n = node, p = parent(node); p != NIL; n = p, p = parent(n)) {
aggregatedCounts[p] += count;
aggregatedSizes[p] += 1;
}
super.afterInsertion(node);
}
// Public API
public double mean(int node) {
return centroids[node];
}
public long count(int node) {
return counts[node];
}
public long id(int node) {
return ids[node];
}
public void addGroup(double centroid, long count, long id) {
tmpCentroid = centroid;
tmpCount = count;
tmpId = id;
if (id >= nextId) {
nextId = id + 1;
}
addNode();
}
public void addGroup(double centroid, long count) {
addGroup(centroid, count, nextId++);
}
public boolean removeGroup(double centroid, int id) {
tmpCentroid = centroid;
tmpId = id;
final int nodeToRemove = getNode();
if (nodeToRemove != NIL) {
removeNode(nodeToRemove);
return true;
} else {
return false;
}
}
public void updateGroup(int node, double centroid, long count) {
tmpCentroid = centroid;
tmpId = id(node);
tmpCount = count;
final int prev = prevNode(node);
final int next = nextNode(node);
if ((prev == NIL || compare(prev) > 0) && (next == NIL || compare(next) < 0)) {
// we can update in place
copy(node);
for (int n = node; n != NIL; n = parent(n)) {
fixCounts(n);
}
} else {
removeNode(node);
addNode();
}
}
/** Return the last node whose centroid is strictly smaller than <code>centroid</code>. */
public int floorNode(double centroid) {
int floor = NIL;
for (int node = root(); node != NIL; ) {
final int cmp = Double.compare(centroid, mean(node));
if (cmp <= 0) {
node = left(node);
} else {
floor = node;
node = right(node);
}
}
return floor;
}
/** Return the first node that is greater than or equal to <code>centroid</code>. */
public int ceilingNode(double centroid) {
int ceiling = NIL;
for (int node = root(); node != NIL; ) {
final int cmp = Double.compare(mean(node), centroid);
if (cmp < 0) {
node = right(node);
} else {
ceiling = node;
node = left(node);
}
}
return ceiling;
}
/** Compute the number of elements and sum of counts for every entry that is strictly before <code>node</code>. */
public void headSum(int node, SizeAndSum sizeAndSum) {
if (node == NIL) {
sizeAndSum.size = 0;
sizeAndSum.sum = 0;
return;
}
final int left = left(node);
sizeAndSum.size = aggregatedSizes[left];
sizeAndSum.sum = aggregatedCounts[left];
for (int n = node, p = parent(node); p != NIL; n = p, p = parent(n)) {
if (n == right(p)) {
final int leftP = left(p);
sizeAndSum.size += 1 + aggregatedSizes[leftP];
sizeAndSum.sum += counts[p] + aggregatedCounts[leftP];
}
}
}
/** Wrapper around a size and a sum. */
public static class SizeAndSum {
public int size;
public long sum;
}
}

View File

@ -1,737 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics.percentiles.tdigest;
import com.carrotsearch.hppc.IntArrayDeque;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.OpenBitSet;
import org.apache.lucene.util.RamUsageEstimator;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* A red-black tree that identifies every node with a unique dense integer ID to make it easy to store node-specific data into
* parallel arrays. This implementation doesn't support more than 2B nodes.
*/
public abstract class RedBlackTree implements Iterable<IntCursor> {
protected static final int NIL = 0;
private static final boolean BLACK = false, RED = true;
private int size;
private int maxNode;
private int root;
private final IntArrayDeque unusedSlots;
private int[] leftNodes, rightNodes, parentNodes;
private final OpenBitSet colors;
/** Create a new instance able to store <code>capacity</code> without resizing. */
protected RedBlackTree(int capacity) {
size = 0;
maxNode = 1;
root = NIL;
leftNodes = new int[1+capacity];
rightNodes = new int[1+capacity];
parentNodes = new int[1+capacity];
colors = new OpenBitSet(1+capacity);
unusedSlots = new IntArrayDeque();
}
/** Return the identifier of the root of the tree. */
protected final int root() {
assert size > 0 || root == NIL;
return root;
}
/** Release a node */
protected void releaseNode(int node) {
unusedSlots.addLast(node);
parent(node, NIL);
left(node, NIL);
right(node, NIL);
}
/** Create a new node in this tree. */
protected int newNode() {
if (unusedSlots.isEmpty()) {
final int slot = maxNode++;
if (maxNode > leftNodes.length) {
final int minSize = ArrayUtil.oversize(maxNode, RamUsageEstimator.NUM_BYTES_INT);
leftNodes = Arrays.copyOf(leftNodes, minSize);
rightNodes = Arrays.copyOf(rightNodes, minSize);
parentNodes = Arrays.copyOf(parentNodes, minSize);
colors.ensureCapacity(leftNodes.length);
}
return slot;
} else {
return unusedSlots.removeLast();
}
}
/** Return the number of nodes in this tree. */
public int size() {
assert size == maxNode - unusedSlots.size() - 1 : size + " != " + (maxNode - unusedSlots.size() - 1);
return size;
}
private boolean color(int node) {
return colors.get(node);
}
private void color(int node, boolean color) {
assert node != NIL;
if (color) {
colors.fastSet(node);
} else {
colors.fastClear(node);
}
}
/** Return the parent of the given node. */
protected final int parent(int node) {
return parentNodes[node];
}
private void parent(int node, int parent) {
assert node != NIL;
parentNodes[node] = parent;
}
/** Return the left child of the given node. */
protected final int left(int node) {
assert node != NIL;
return leftNodes[node];
}
protected void left(int node, int leftNode) {
assert node != NIL;
leftNodes[node] = leftNode;
}
/** Return the right child of the given node. */
protected final int right(int node) {
assert node != NIL;
return rightNodes[node];
}
private void right(int node, int rightNode) {
assert node != NIL;
rightNodes[node] = rightNode;
}
// return the number of black nodes to go through up to leaves
// to use within assertions only
private int numBlackNode(int node) {
assert assertConsistent(node);
if (node == NIL) {
return 1;
} else {
final int numLeft = numBlackNode(left(node));
final int numRight = numBlackNode(right(node));
assert numLeft == numRight : numLeft + " " + numRight;
int num = numLeft;
if (color(node) == BLACK) {
++num;
}
return num;
}
}
// check consistency of parent/left/right
private boolean assertConsistent(int node) {
if (node == NIL) {
return true;
}
final int parent = parent(node);
if (parent == NIL) {
assert node == root;
} else {
assert node == left(parent) || node == right(parent);
}
final int left = left(node);
if (left != NIL) {
assert parent(left) == node;
}
final int right = right(node);
if (right != NIL) {
assert parent(right) == node;
}
return true;
}
// for testing
public void assertConsistent() {
numBlackNode(root);
}
/** Rotate left the subtree under <code>n</code> */
protected void rotateLeft(int n) {
final int r = right(n);
final int lr = left(r);
right(n, left(r));
if (lr != NIL) {
parent(lr, n);
}
final int p = parent(n);
parent(r, p);
if (p == NIL) {
root = r;
} else if (left(p) == n) {
left(p, r);
} else {
assert right(p) == n;
right(p, r);
}
left(r, n);
parent(n, r);
}
/** Rotate right the subtree under <code>n</code> */
protected void rotateRight(int n) {
final int l = left(n);
final int rl = right(l);
left(n, rl);
if (rl != NIL) {
parent(rl, n);
}
final int p = parent(n);
parent(l, p);
if (p == NIL) {
root = l;
} else if (right(p) == n) {
right(p, l);
} else {
assert left(p) == n;
left(p, l);
}
right(l, n);
parent(n, l);
}
/** Called after insertions. Base implementation just balances the tree,
* see http://en.wikipedia.org/wiki/Red%E2%80%93black_tree#Insertion */
protected void afterInsertion(int node) {
color(node, RED);
insertCase1(node);
}
private void insertCase1(int node) {
final int parent = parent(node);
if (parent == NIL) {
assert node == root;
color(node, BLACK);
} else {
insertCase2(node, parent);
}
}
private void insertCase2(int node, int parent) {
if (color(parent) != BLACK) {
insertCase3(node, parent);
}
}
private void insertCase3(int node, int parent) {
final int grandParent = parent(parent);
assert grandParent != NIL;
final int uncle;
if (parent == left(grandParent)) {
uncle = right(grandParent);
} else {
assert parent == right(grandParent);
uncle = left(grandParent);
}
if (uncle != NIL && color(uncle) == RED) {
color(parent, BLACK);
color(uncle, BLACK);
color(grandParent, RED);
insertCase1(grandParent);
} else {
insertCase4(node, parent, grandParent);
}
}
private void insertCase4(int node, int parent, int grandParent) {
if (node == right(parent) && parent == left(grandParent)) {
rotateLeft(parent);
node = left(node);
} else if (node == left(parent) && parent == right(grandParent)) {
rotateRight(parent);
node = right(node);
}
insertCase5(node);
}
private void insertCase5(int node) {
final int parent = parent(node);
final int grandParent = parent(parent);
color(parent, BLACK);
color(grandParent, RED);
if (node == left(parent)) {
rotateRight(grandParent);
} else {
assert node == right(parent);
rotateLeft(grandParent);
}
}
/** Called before the removal of a node. */
protected void beforeRemoval(int node) {}
// see http://en.wikipedia.org/wiki/Red%E2%80%93black_tree#Removal
/** Called after removal. Base implementation just balances the tree,
* see http://en.wikipedia.org/wiki/Red%E2%80%93black_tree#Removal */
protected void afterRemoval(int node) {
assert node != NIL;
if (color(node) == BLACK) {
removeCase1(node);
}
}
private int sibling(int node, int parent) {
final int left = left(parent);
if (node == left) {
return right(parent);
} else {
assert node == right(parent);
return left;
}
}
private void removeCase1(int node) {
if (parent(node) != NIL) {
removeCase2(node);
}
}
private void removeCase2(int node) {
final int parent = parent(node);
final int sibling = sibling(node, parent);
if (color(sibling) == RED) {
color(sibling, BLACK);
color(parent, RED);
if (node == left(parent)) {
rotateLeft(parent);
} else {
assert node == right(parent);
rotateRight(parent);
}
}
removeCase3(node);
}
private void removeCase3(int node) {
final int parent = parent(node);
final int sibling = sibling(node, parent);
if (color(parent) == BLACK && sibling != NIL && color(sibling) == BLACK
&& color(left(sibling)) == BLACK && color(right(sibling)) == BLACK) {
color(sibling, RED);
removeCase1(parent);
} else {
removeCase4(node, parent, sibling);
}
}
private void removeCase4(int node, int parent, int sibling) {
if (color(parent) == RED && sibling != NIL && color(sibling) == BLACK
&& color(left(sibling)) == BLACK && color(right(sibling)) == BLACK) {
color(sibling, RED);
color(parent, BLACK);
} else {
removeCase5(node, parent, sibling);
}
}
private void removeCase5(int node, int parent, int sibling) {
if (color(sibling) == BLACK) {
if (node == left(parent) && sibling != NIL && color(left(sibling)) == RED && color(right(sibling)) == BLACK) {
color(sibling, RED);
color(left(sibling), BLACK);
rotateRight(sibling);
} else if (node == right(parent) && sibling != NIL && color(left(sibling)) == BLACK && color(right(sibling)) == RED) {
color(sibling, RED);
color(right(sibling), BLACK);
rotateLeft(sibling);
}
}
removeCase6(node);
}
private void removeCase6(int node) {
final int parent = parent(node);
final int sibling = sibling(node, parent);
color(sibling, color(parent));
color(parent, BLACK);
if (node == left(parent)) {
color(right(sibling), BLACK);
rotateLeft(parent);
} else {
assert node == right(parent);
color(left(sibling), BLACK);
rotateRight(parent);
}
}
/** Compare to <code>node</code>. */
protected abstract int compare(int node);
/** Copy data used for comparison into <code>node</code>. */
protected abstract void copy(int node);
/** Merge data used for comparison into <code>node</code>. */
protected abstract void merge(int node);
/** Add a node to the tree. */
public boolean addNode() {
int newNode = NIL;
if (size == 0) {
newNode = root = newNode();
copy(root);
} else {
int parent = NIL, node = root;
int cmp;
do {
cmp = compare(node);
if (cmp < 0) {
parent = node;
node = left(node);
} else if (cmp > 0) {
parent = node;
node = right(node);
} else {
merge(node);
return false;
}
} while (node != NIL);
newNode = newNode();
copy(newNode);
if (cmp < 0) {
left(parent, newNode);
} else {
assert cmp > 0;
right(parent, newNode);
}
parent(newNode, parent);
}
++size;
afterInsertion(newNode);
return true;
}
public int getNode() {
if (size() == 0) {
return NIL;
}
int node = root;
do {
final int cmp = compare(node);
if (cmp < 0) {
node = left(node);
} else if (cmp > 0) {
node = right(node);
} else {
return node;
}
} while (node != NIL);
return NIL;
}
/** Swap two nodes. */
protected void swap(int node1, int node2) {
final int parent1 = parent(node1);
final int parent2 = parent(node2);
if (parent1 != NIL) {
if (node1 == left(parent1)) {
left(parent1, node2);
} else {
assert node1 == right(parent1);
right(parent1, node2);
}
} else {
assert root == node1;
root = node2;
}
if (parent2 != NIL) {
if (node2 == left(parent2)) {
left(parent2, node1);
} else {
assert node2 == right(parent2);
right(parent2, node1);
}
} else {
assert root == node2;
root = node1;
}
parent(node1, parent2);
parent(node2, parent1);
final int left1 = left(node1);
final int left2 = left(node2);
left(node1, left2);
if (left2 != NIL) {
parent(left2, node1);
}
left(node2, left1);
if (left1 != NIL) {
parent(left1, node2);
}
final int right1 = right(node1);
final int right2 = right(node2);
right(node1, right2);
if (right2 != NIL) {
parent(right2, node1);
}
right(node2, right1);
if (right1 != NIL) {
parent(right1, node2);
}
final boolean color1 = color(node1);
final boolean color2 = color(node2);
color(node1, color2);
color(node2, color1);
assertConsistent(node1);
assertConsistent(node2);
}
/** Remove a node from the tree. */
public void removeNode(int nodeToRemove) {
assert nodeToRemove != NIL;
if (left(nodeToRemove) != NIL && right(nodeToRemove) != NIL) {
final int next = nextNode(nodeToRemove);
if (next != NIL) {
swap(nodeToRemove, next);
//copy(next, nodeToRemove);
//nodeToRemove = next;
}
}
assert left(nodeToRemove) == NIL || right(nodeToRemove) == NIL;
final int left = left(nodeToRemove);
int child = left != NIL ? left : right(nodeToRemove);
beforeRemoval(nodeToRemove);
if (child != NIL) {
final int parent = parent(nodeToRemove);
parent(child, parent);
if (parent != NIL) {
if (nodeToRemove == left(parent)) {
left(parent, child);
} else {
assert nodeToRemove == right(parent);
right(parent, child);
}
} else {
assert nodeToRemove == root;
root = child;
}
if (color(nodeToRemove) == BLACK) {
color(child, BLACK);
} else {
afterRemoval(child);
}
} else {
// no children
final int parent = parent(nodeToRemove);
if (parent == NIL) {
assert nodeToRemove == root;
root = NIL;
} else {
afterRemoval(nodeToRemove);
if (nodeToRemove == left(parent)) {
left(parent, NIL);
} else {
assert nodeToRemove == right(parent);
right(parent, NIL);
}
}
}
releaseNode(nodeToRemove);
--size;
}
/** Return the least node under <code>node</code>. */
protected final int first(int node) {
if (node == NIL) {
return NIL;
}
while (true) {
final int left = left(node);
if (left == NIL) {
break;
}
node = left;
}
return node;
}
/** Return the largest node under <code>node</code>. */
protected final int last(int node) {
while (true) {
final int right = right(node);
if (right == NIL) {
break;
}
node = right;
}
return node;
}
/** Return the previous node. */
public final int prevNode(int node) {
final int left = left(node);
if (left != NIL) {
return last(left);
} else {
int parent = parent(node);
while (parent != NIL && node == left(parent)) {
node = parent;
parent = parent(parent);
}
return parent;
}
}
/** Return the next node. */
public final int nextNode(int node) {
final int right = right(node);
if (right != NIL) {
return first(right);
} else {
int parent = parent(node);
while (parent != NIL && node == right(parent)) {
node = parent;
parent = parent(parent);
}
return parent;
}
}
@Override
public Iterator<IntCursor> iterator() {
return iterator(first(root));
}
private Iterator<IntCursor> iterator(final int startNode) {
return new UnmodifiableIterator<IntCursor>() {
boolean nextSet;
final IntCursor cursor;
{
cursor = new IntCursor();
cursor.index = -1;
cursor.value = startNode;
nextSet = cursor.value != NIL;
}
boolean computeNext() {
if (cursor.value != NIL) {
cursor.value = RedBlackTree.this.nextNode(cursor.value);
}
return nextSet = (cursor.value != NIL);
}
@Override
public boolean hasNext() {
return nextSet || computeNext();
}
@Override
public IntCursor next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
nextSet = false;
return cursor;
}
};
}
public Iterator<IntCursor> reverseIterator() {
return reverseIterator(last(root));
}
private Iterator<IntCursor> reverseIterator(final int startNode) {
return new UnmodifiableIterator<IntCursor>() {
boolean nextSet;
final IntCursor cursor;
{
cursor = new IntCursor();
cursor.index = -1;
cursor.value = startNode;
nextSet = cursor.value != NIL;
}
boolean computeNext() {
if (cursor.value != NIL) {
cursor.value = RedBlackTree.this.prevNode(cursor.value);
}
return nextSet = (cursor.value != NIL);
}
@Override
public boolean hasNext() {
return nextSet || computeNext();
}
@Override
public IntCursor next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
nextSet = false;
return cursor;
}
};
}
/** Return a view over the nodes that are stored after <code>startNode</code> in the tree. */
public Iterable<IntCursor> tailSet(final int startNode) {
return new Iterable<IntCursor>() {
@Override
public Iterator<IntCursor> iterator() {
return RedBlackTree.this.iterator(startNode);
}
};
}
/** Return a reversed view over the elements of this tree. */
public Iterable<IntCursor> reverseSet() {
return new Iterable<IntCursor>() {
@Override
public Iterator<IntCursor> iterator() {
return RedBlackTree.this.reverseIterator();
}
};
}
}

View File

@ -17,327 +17,31 @@
package org.elasticsearch.search.aggregations.metrics.percentiles.tdigest;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import com.tdunning.math.stats.AVLTreeDigest;
import com.tdunning.math.stats.Centroid;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.GroupRedBlackTree.SizeAndSum;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
/**
* Fork of https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/TDigest.java
* Modified for less object allocation, faster estimations and integration with Elasticsearch serialization.
* Extension of {@link TDigest} with custom serialization.
*/
public class TDigestState {
public class TDigestState extends AVLTreeDigest {
private final Random gen;
private double compression = 100;
private GroupRedBlackTree summary;
private long count = 0;
private final SizeAndSum sizeAndSum = new SizeAndSum();
private final double compression;
/**
* A histogram structure that will record a sketch of a distribution.
*
* @param compression How should accuracy be traded for size? A value of N here will give quantile errors
* almost always less than 3/N with considerably smaller errors expected for extreme
* quantiles. Conversely, you should expect to track about 5 N centroids for this
* accuracy.
*/
public TDigestState(double compression) {
super(compression);
this.compression = compression;
gen = new Random();
summary = new GroupRedBlackTree((int) compression);
}
/**
* Adds a sample to a histogram.
*
* @param x The value to add.
*/
public void add(double x) {
add(x, 1);
}
/**
* Adds a sample to a histogram.
*
* @param x The value to add.
* @param w The weight of this point.
*/
public void add(double x, long w) {
int startNode = summary.floorNode(x);
if (startNode == RedBlackTree.NIL) {
startNode = summary.ceilingNode(x);
}
if (startNode == RedBlackTree.NIL) {
summary.addGroup(x, w);
count = w;
} else {
double minDistance = Double.POSITIVE_INFINITY;
int lastNeighbor = 0;
summary.headSum(startNode, sizeAndSum);
final int headSize = sizeAndSum.size;
int i = headSize;
for (int node = startNode; node != RedBlackTree.NIL; node = summary.nextNode(node)) {
double z = Math.abs(summary.mean(node) - x);
if (z <= minDistance) {
minDistance = z;
lastNeighbor = i;
} else {
break;
}
i++;
}
int closest = RedBlackTree.NIL;
long sum = sizeAndSum.sum;
i = headSize;
double n = 1;
for (int node = startNode; node != RedBlackTree.NIL; node = summary.nextNode(node)) {
if (i > lastNeighbor) {
break;
}
double z = Math.abs(summary.mean(node) - x);
double q = (sum + summary.count(node) / 2.0) / count;
double k = 4 * count * q * (1 - q) / compression;
// this slightly clever selection method improves accuracy with lots of repeated points
if (z == minDistance && summary.count(node) + w <= k) {
if (gen.nextDouble() < 1 / n) {
closest = node;
}
n++;
}
sum += summary.count(node);
i++;
}
if (closest == RedBlackTree.NIL) {
summary.addGroup(x, w);
} else {
double centroid = summary.mean(closest);
long count = summary.count(closest);
count += w;
centroid += w * (x - centroid) / count;
summary.updateGroup(closest, centroid, count);
}
count += w;
if (summary.size() > 100 * compression) {
// something such as sequential ordering of data points
// has caused a pathological expansion of our summary.
// To fight this, we simply replay the current centroids
// in random order.
// this causes us to forget the diagnostic recording of data points
compress();
}
}
}
private int[] shuffleNodes(RedBlackTree tree) {
int[] nodes = new int[tree.size()];
int i = 0;
for (IntCursor cursor : tree) {
nodes[i++] = cursor.value;
}
assert i == tree.size();
for (i = tree.size() - 1; i > 0; --i) {
final int slot = gen.nextInt(i + 1);
final int tmp = nodes[slot];
nodes[slot] = nodes[i];
nodes[i] = tmp;
}
return nodes;
}
public void add(TDigestState other) {
final int[] shuffledNodes = shuffleNodes(other.summary);
for (int node : shuffledNodes) {
add(other.summary.mean(node), other.summary.count(node));
}
}
public static TDigestState merge(double compression, Iterable<TDigestState> subData) {
Preconditions.checkArgument(subData.iterator().hasNext(), "Can't merge 0 digests");
List<TDigestState> elements = Lists.newArrayList(subData);
int n = Math.max(1, elements.size() / 4);
TDigestState r = new TDigestState(compression);
for (int i = 0; i < elements.size(); i += n) {
if (n > 1) {
r.add(merge(compression, elements.subList(i, Math.min(i + n, elements.size()))));
} else {
r.add(elements.get(i));
}
}
return r;
}
public void compress() {
compress(summary);
}
private void compress(GroupRedBlackTree other) {
TDigestState reduced = new TDigestState(compression);
final int[] shuffledNodes = shuffleNodes(other);
for (int node : shuffledNodes) {
reduced.add(other.mean(node), other.count(node));
}
summary = reduced.summary;
}
/**
* Returns the number of samples represented in this histogram. If you want to know how many
* centroids are being used, try centroids().size().
*
* @return the number of samples that have been added.
*/
public long size() {
return count;
}
public GroupRedBlackTree centroids() {
return summary;
}
/**
* @param x the value at which the CDF should be evaluated
* @return the approximate fraction of all samples that were less than or equal to x.
*/
public double cdf(double x) {
GroupRedBlackTree values = summary;
if (values.size() == 0) {
return Double.NaN;
} else if (values.size() == 1) {
return x < values.mean(values.root()) ? 0 : 1;
} else {
double r = 0;
// we scan a across the centroids
Iterator<IntCursor> it = values.iterator();
int a = it.next().value;
// b is the look-ahead to the next centroid
int b = it.next().value;
// initially, we set left width equal to right width
double left = (values.mean(b) - values.mean(a)) / 2;
double right = left;
// scan to next to last element
while (it.hasNext()) {
if (x < values.mean(a) + right) {
return (r + values.count(a) * interpolate(x, values.mean(a) - left, values.mean(a) + right)) / count;
}
r += values.count(a);
a = b;
b = it.next().value;
left = right;
right = (values.mean(b) - values.mean(a)) / 2;
}
// for the last element, assume right width is same as left
left = right;
a = b;
if (x < values.mean(a) + right) {
return (r + values.count(a) * interpolate(x, values.mean(a) - left, values.mean(a) + right)) / count;
} else {
return 1;
}
}
}
/**
* @param q The quantile desired. Can be in the range [0,1].
* @return The minimum value x such that we think that the proportion of samples is <= x is q.
*/
public double quantile(double q) {
if (q < 0 || q > 1) {
throw new ElasticsearchIllegalArgumentException("q should be in [0,1], got " + q);
}
GroupRedBlackTree values = summary;
if (values.size() == 0) {
return Double.NaN;
} else if (values.size() == 1) {
return values.mean(values.root());
}
// if values were stored in a sorted array, index would be the offset we are interested in
final double index = q * (count - 1);
double previousMean = Double.NaN, previousIndex = 0;
long total = 0;
int next;
Iterator<IntCursor> it = centroids().iterator();
while (true) {
next = it.next().value;
final double nextIndex = total + (values.count(next) - 1.0) / 2;
if (nextIndex >= index) {
if (Double.isNaN(previousMean)) {
// special case 1: the index we are interested in is before the 1st centroid
if (nextIndex == previousIndex) {
return values.mean(next);
}
// assume values grow linearly between index previousIndex=0 and nextIndex2
int next2 = it.next().value;
final double nextIndex2 = total + values.count(next) + (values.count(next2) - 1.0) / 2;
previousMean = (nextIndex2 * values.mean(next) - nextIndex * values.mean(next2)) / (nextIndex2 - nextIndex);
}
// common case: we found two centroids previous and next so that the desired quantile is
// after 'previous' but before 'next'
return quantile(previousIndex, index, nextIndex, previousMean, values.mean(next));
} else if (!it.hasNext()) {
// special case 2: the index we are interested in is beyond the last centroid
// again, assume values grow linearly between index previousIndex and (count - 1)
// which is the highest possible index
final double nextIndex2 = count - 1;
final double nextMean2 = (values.mean(next) * (nextIndex2 - previousIndex) - previousMean * (nextIndex2 - nextIndex)) / (nextIndex - previousIndex);
return quantile(nextIndex, index, nextIndex2, values.mean(next), nextMean2);
}
total += values.count(next);
previousMean = values.mean(next);
previousIndex = nextIndex;
}
}
private static double quantile(double previousIndex, double index, double nextIndex, double previousMean, double nextMean) {
final double delta = nextIndex - previousIndex;
final double previousWeight = (nextIndex - index) / delta;
final double nextWeight = (index - previousIndex) / delta;
return previousMean * previousWeight + nextMean * nextWeight;
}
public int centroidCount() {
return summary.size();
}
public double compression() {
return compression;
}
private double interpolate(double x, double x0, double x1) {
return (x - x0) / (x1 - x0);
}
//===== elastic search serialization ======//
public static void write(TDigestState state, StreamOutput out) throws IOException {
out.writeDouble(state.compression);
out.writeVInt(state.summary.size());
for (IntCursor cursor : state.summary) {
final int node = cursor.value;
out.writeDouble(state.summary.mean(node));
out.writeVLong(state.summary.count(node));
out.writeVInt(state.centroidCount());
for (Centroid centroid : state.centroids()) {
out.writeDouble(centroid.mean());
out.writeVLong(centroid.count());
}
}
@ -350,4 +54,5 @@ public class TDigestState {
}
return state;
}
}

View File

@ -24,7 +24,6 @@ import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfo.DocValuesType;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexableFieldType;
import org.apache.mahout.math.Arrays;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
@ -39,6 +38,8 @@ import org.elasticsearch.index.mapper.core.StringFieldMapper;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.Arrays;
import static org.hamcrest.Matchers.*;
/**

View File

@ -1,479 +0,0 @@
package org.elasticsearch.search.aggregations.metrics;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Upstream: Stream-lib, master @ 704002a2d8fa01fa7e9868dae9d0c8bedd8e9427
* https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/quantile/GroupTree.java
*/
/**
* A tree containing TDigest.Group. This adds to the normal NavigableSet the
* ability to sum up the size of elements to the left of a particular group.
*/
public class GroupTree implements Iterable<GroupTree.Group> {
private int count;
int size;
private int depth;
private Group leaf;
private GroupTree left, right;
public GroupTree() {
count = size = depth = 0;
leaf = null;
left = right = null;
}
public GroupTree(Group leaf) {
size = depth = 1;
this.leaf = leaf;
count = leaf.count();
left = right = null;
}
public GroupTree(GroupTree left, GroupTree right) {
this.left = left;
this.right = right;
count = left.count + right.count;
size = left.size + right.size;
rebalance();
leaf = this.right.first();
}
public void add(Group group) {
if (size == 0) {
leaf = group;
depth = 1;
count = group.count();
size = 1;
return;
} else if (size == 1) {
int order = group.compareTo(leaf);
if (order < 0) {
left = new GroupTree(group);
right = new GroupTree(leaf);
} else if (order > 0) {
left = new GroupTree(leaf);
right = new GroupTree(group);
leaf = group;
}
} else if (group.compareTo(leaf) < 0) {
left.add(group);
} else {
right.add(group);
}
count += group.count();
size++;
depth = Math.max(left.depth, right.depth) + 1;
rebalance();
}
private void rebalance() {
int l = left.depth();
int r = right.depth();
if (l > r + 1) {
if (left.left.depth() > left.right.depth()) {
rotate(left.left.left, left.left.right, left.right, right);
} else {
rotate(left.left, left.right.left, left.right.right, right);
}
} else if (r > l + 1) {
if (right.left.depth() > right.right.depth()) {
rotate(left, right.left.left, right.left.right, right.right);
} else {
rotate(left, right.left, right.right.left, right.right.right);
}
} else {
depth = Math.max(left.depth(), right.depth()) + 1;
}
}
private void rotate(GroupTree a, GroupTree b, GroupTree c, GroupTree d) {
left = new GroupTree(a, b);
right = new GroupTree(c, d);
count = left.count + right.count;
size = left.size + right.size;
depth = Math.max(left.depth(), right.depth()) + 1;
leaf = right.first();
}
private int depth() {
return depth;
}
public int size() {
return size;
}
/**
* @return the number of items strictly before the current element
*/
public int headCount(Group base) {
if (size == 0) {
return 0;
} else if (left == null) {
return leaf.compareTo(base) < 0 ? 1 : 0;
} else {
if (base.compareTo(leaf) < 0) {
return left.headCount(base);
} else {
return left.size + right.headCount(base);
}
}
}
/**
* @return the sum of the size() function for all elements strictly before the current element.
*/
public int headSum(Group base) {
if (size == 0) {
return 0;
} else if (left == null) {
return leaf.compareTo(base) < 0 ? count : 0;
} else {
if (base.compareTo(leaf) <= 0) {
return left.headSum(base);
} else {
return left.count + right.headSum(base);
}
}
}
/**
* @return the first Group in this set
*/
public Group first() {
Preconditions.checkState(size > 0, "No first element of empty set");
if (left == null) {
return leaf;
} else {
return left.first();
}
}
/**
* Iteratres through all groups in the tree.
*/
public Iterator<Group> iterator() {
return iterator(null);
}
/**
* Iterates through all of the Groups in this tree in ascending order of means
*
* @param start The place to start this subset. Remember that Groups are ordered by mean *and* id.
* @return An iterator that goes through the groups in order of mean and id starting at or after the
* specified Group.
*/
private Iterator<Group> iterator(final Group start) {
return new AbstractIterator<Group>() {
{
stack = new ArrayDeque<>();
push(GroupTree.this, start);
}
Deque<GroupTree> stack;
// recurses down to the leaf that is >= start
// pending right hand branches on the way are put on the stack
private void push(GroupTree z, Group start) {
while (z.left != null) {
if (start == null || start.compareTo(z.leaf) < 0) {
// remember we will have to process the right hand branch later
stack.push(z.right);
// note that there is no guarantee that z.left has any good data
z = z.left;
} else {
// if the left hand branch doesn't contain start, then no push
z = z.right;
}
}
// put the leaf value on the stack if it is valid
if (start == null || z.leaf.compareTo(start) >= 0) {
stack.push(z);
}
}
@Override
protected Group computeNext() {
GroupTree r = stack.poll();
while (r != null && r.left != null) {
// unpack r onto the stack
push(r, start);
r = stack.poll();
}
// at this point, r == null or r.left == null
// if r == null, stack is empty and we are done
// if r != null, then r.left != null and we have a result
if (r != null) {
return r.leaf;
}
return endOfData();
}
};
}
public void remove(Group base) {
Preconditions.checkState(size > 0, "Cannot remove from empty set");
if (size == 1) {
Preconditions.checkArgument(base.compareTo(leaf) == 0, "Element %s not found", base);
count = size = 0;
leaf = null;
} else {
if (base.compareTo(leaf) < 0) {
if (left.size > 1) {
left.remove(base);
count -= base.count();
size--;
rebalance();
} else {
size = right.size;
count = right.count;
depth = right.depth;
leaf = right.leaf;
left = right.left;
right = right.right;
}
} else {
if (right.size > 1) {
right.remove(base);
leaf = right.first();
count -= base.count();
size--;
rebalance();
} else {
size = left.size;
count = left.count;
depth = left.depth;
leaf = left.leaf;
right = left.right;
left = left.left;
}
}
}
}
/**
* @return the largest element less than or equal to base
*/
public Group floor(Group base) {
if (size == 0) {
return null;
} else {
if (size == 1) {
return base.compareTo(leaf) >= 0 ? leaf : null;
} else {
if (base.compareTo(leaf) < 0) {
return left.floor(base);
} else {
Group floor = right.floor(base);
if (floor == null) {
floor = left.last();
}
return floor;
}
}
}
}
public Group last() {
Preconditions.checkState(size > 0, "Cannot find last element of empty set");
if (size == 1) {
return leaf;
} else {
return right.last();
}
}
/**
* @return the smallest element greater than or equal to base.
*/
public Group ceiling(Group base) {
if (size == 0) {
return null;
} else if (size == 1) {
return base.compareTo(leaf) <= 0 ? leaf : null;
} else {
if (base.compareTo(leaf) < 0) {
Group r = left.ceiling(base);
if (r == null) {
r = right.first();
}
return r;
} else {
return right.ceiling(base);
}
}
}
/**
* @return the subset of elements equal to or greater than base.
*/
public Iterable<Group> tailSet(final Group start) {
return new Iterable<Group>() {
@Override
public Iterator<Group> iterator() {
return GroupTree.this.iterator(start);
}
};
}
public int sum() {
return count;
}
public void checkBalance() {
if (left != null) {
Preconditions.checkState(Math.abs(left.depth() - right.depth()) < 2, "Imbalanced");
int l = left.depth();
int r = right.depth();
Preconditions.checkState(depth == Math.max(l, r) + 1, "Depth doesn't match children");
Preconditions.checkState(size == left.size + right.size, "Sizes don't match children");
Preconditions.checkState(count == left.count + right.count, "Counts don't match children");
Preconditions.checkState(leaf.compareTo(right.first()) == 0, "Split is wrong %.5d != %.5d or %d != %d", leaf.mean(), right.first().mean(), leaf.id(), right.first().id());
left.checkBalance();
right.checkBalance();
}
}
public void print(int depth) {
for (int i = 0; i < depth; i++) {
System.out.print("| ");
}
int imbalance = Math.abs((left != null ? left.depth : 1) - (right != null ? right.depth : 1));
System.out.printf(Locale.ENGLISH, "%s%s, %d, %d, %d\n", (imbalance > 1 ? "* " : "") + (right != null && leaf.compareTo(right.first()) != 0 ? "+ " : ""), leaf, size, count, this.depth);
if (left != null) {
left.print(depth + 1);
right.print(depth + 1);
}
}
public static class Group implements Comparable<Group> {
private static final AtomicInteger uniqueCount = new AtomicInteger(1);
double centroid = 0;
int count = 0;
private int id;
private List<Double> actualData = null;
private Group(boolean record) {
id = uniqueCount.incrementAndGet();
if (record) {
actualData = Lists.newArrayList();
}
}
public Group(double x) {
this(false);
start(x, uniqueCount.getAndIncrement());
}
public Group(double x, int id) {
this(false);
start(x, id);
}
public Group(double x, int id, boolean record) {
this(record);
start(x, id);
}
private void start(double x, int id) {
this.id = id;
add(x, 1);
}
public void add(double x, int w) {
if (actualData != null) {
actualData.add(x);
}
count += w;
centroid += w * (x - centroid) / count;
}
public double mean() {
return centroid;
}
public int count() {
return count;
}
public int id() {
return id;
}
@Override
public String toString() {
return "Group{" +
"centroid=" + centroid +
", count=" + count +
'}';
}
@Override
public int hashCode() {
return id;
}
@Override
public int compareTo(Group o) {
int r = Double.compare(centroid, o.centroid);
if (r == 0) {
r = id - o.id;
}
return r;
}
public Iterable<? extends Double> data() {
return actualData;
}
public static Group createWeighted(double x, int w, Iterable<? extends Double> data) {
Group r = new Group(data != null);
r.add(x, w, data);
return r;
}
private void add(double x, int w, Iterable<? extends Double> data) {
if (actualData != null) {
if (data != null) {
for (Double old : data) {
actualData.add(old);
}
} else {
actualData.add(x);
}
}
count += w;
centroid += w * (x - centroid) / count;
}
}
}

View File

@ -1,100 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.google.common.collect.Lists;
import org.elasticsearch.search.aggregations.metrics.GroupTree.Group;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.GroupRedBlackTree;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.GroupRedBlackTree.SizeAndSum;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public class GroupTreeTests extends ElasticsearchTestCase {
public void testDuel() {
GroupTree tree1 = new GroupTree();
GroupRedBlackTree tree2 = new GroupRedBlackTree(randomInt(100));
// Add elements
final int elements = scaledRandomIntBetween(100, 1000);
for (int i = 0; i < elements; ++i) {
final double centroid = randomDouble();
final int count = randomIntBetween(1, 5);
Group g = new Group(centroid, i);
g.add(centroid, count - 1);
tree1.add(g);
tree2.addGroup(centroid, count, i);
}
assertEquals(tree1, tree2);
// Remove
List<Group> toRemove = Lists.newArrayList();
for (Group group : tree1) {
if (randomBoolean()) {
toRemove.add(group);
}
}
Collections.shuffle(toRemove, getRandom());
for (Group group : toRemove) {
tree1.remove(group);
final boolean removed = tree2.removeGroup(group.mean(), group.id());
assertTrue(removed);
}
assertEquals(tree1, tree2);
}
public static void assertEquals(GroupTree tree1, GroupRedBlackTree tree2) {
assertEquals(tree1.size(), tree2.size());
Iterator<Group> groups1 = tree1.iterator();
Iterator<IntCursor> groups2 = tree2.iterator();
while (true) {
assertEquals(groups1.hasNext(), groups2.hasNext());
if (!groups1.hasNext()) {
break;
}
final Group next1 = groups1.next();
final IntCursor next2 = groups2.next();
assertEquals(next1.mean(), tree2.mean(next2.value), 0.0001);
assertEquals(next1.count(), tree2.count(next2.value));
}
assertConsistent(tree2);
}
public static void assertConsistent(GroupRedBlackTree tree) {
int i = 0;
long sum = 0;
for (IntCursor cursor : tree) {
final int node = cursor.value;
SizeAndSum s = new GroupRedBlackTree.SizeAndSum();
tree.headSum(node, s);
assertEquals(i, s.size);
assertEquals(sum, s.sum);
i++;
sum += tree.count(node);
}
}
}

View File

@ -1,185 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.RedBlackTree;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class RedBlackTreeTests extends ElasticsearchTestCase {
private static class IntRedBlackTree extends RedBlackTree {
private int tmpValue;
private int[] values;
private int[] counts;
IntRedBlackTree() {
super(10);
values = new int[5];
counts = new int[5];
}
@Override
protected int compare(int node) {
return Ints.compare(tmpValue, values[node]);
}
@Override
protected void merge(int node) {
counts[node] += 1;
}
@Override
protected void copy(int node) {
values[node] = tmpValue;
counts[node] = 1;
}
@Override
protected int newNode() {
final int newNode = super.newNode();
if (values.length <= newNode) {
values = ArrayUtil.grow(values, newNode + 1);
counts = Arrays.copyOf(counts, values.length);
}
return newNode;
}
public boolean add(int value) {
tmpValue = value;
return super.addNode();
}
public boolean remove(int value) {
tmpValue = value;
final int nodeToRemove = getNode();
if (nodeToRemove == NIL) {
return false;
} else {
super.removeNode(nodeToRemove);
return true;
}
}
@Override
public String toString() {
StringBuilder b = new StringBuilder();
b.append("{ ");
for (IntCursor cursor : this) {
b.append(values[cursor.value] + "(" + counts[cursor.value] + "), ");
}
b.setLength(b.length() - 2);
b.append(" } ");
return b.toString();
}
}
public void testAdd() {
Map<Integer, Integer> map = Maps.newHashMap();
IntRedBlackTree tree = new IntRedBlackTree();
final int iters = scaledRandomIntBetween(1000, 10000);
for (int i = 0; i < iters; ++i) {
final int value = randomInt(200);
final boolean added = tree.add(value);
tree.assertConsistent();
assertEquals(!map.containsKey(value), added);
if (map.containsKey(value)) {
map.put(value, map.get(value) + 1);
} else {
map.put(value, 1);
}
assertEquals(map.size(), tree.size());
}
int size = 0;
int previousValue = Integer.MIN_VALUE;
for (IntCursor cursor : tree) {
++size;
final int value = tree.values[cursor.value];
assertTrue(previousValue < value);
assertEquals(map.get(value).intValue(), tree.counts[cursor.value]);
previousValue = value;
}
assertEquals(map.size(), size);
}
public void testRemove() {
final int numValues = scaledRandomIntBetween(200, 1000);
final FixedBitSet values = new FixedBitSet(numValues);
values.set(0, numValues);
IntRedBlackTree tree = new IntRedBlackTree();
for (int i = 0; i < numValues; ++i) {
tree.add(i);
}
final int iters = scaledRandomIntBetween(300, 1000);
for (int i = 0; i < iters; ++i) {
final int value = randomInt(numValues - 1);
final boolean removed = tree.remove(value);
assertEquals(removed, values.get(value));
values.clear(value);
assertEquals(values.cardinality(), tree.size());
tree.assertConsistent();
}
int size = 0;
int previousValue = Integer.MIN_VALUE;
for (IntCursor cursor : tree) {
++size;
final int value = tree.values[cursor.value];
assertTrue(previousValue < value);
assertTrue(values.get(value));
previousValue = value;
}
assertEquals(values.cardinality(), size);
}
public void testReverse() {
IntRedBlackTree tree = new IntRedBlackTree();
final int iters = scaledRandomIntBetween(1000, 10000);
for (int i = 0; i < iters; ++i) {
final int value = randomInt(2000);
tree.add(value);
}
List<Integer> sortedNodes = Lists.newArrayList();
for (IntCursor cursor : tree) {
sortedNodes.add(cursor.value);
}
List<Integer> reverseNodes = Lists.newArrayList();
for (IntCursor cursor : tree.reverseSet()) {
reverseNodes.add(cursor.value);
}
Collections.reverse(sortedNodes);
assertEquals(sortedNodes, reverseNodes);
}
}

View File

@ -1,194 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics;
import com.google.common.collect.Lists;
import org.apache.mahout.math.jet.random.AbstractContinousDistribution;
import org.apache.mahout.math.jet.random.Gamma;
import org.apache.mahout.math.jet.random.Normal;
import org.apache.mahout.math.jet.random.Uniform;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.TDigestState;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import static org.hamcrest.Matchers.lessThan;
// imported tests from upstream's TDigestTest
public class TDigestStateTests extends ElasticsearchTestCase {
@Test
public void testUniform() {
Random gen = getRandom();
for (int i = 0; i < 10; i++) {
runTest(new Uniform(0, 1, gen), 100,
new double[]{0.001, 0.01, 0.1, 0.5, 0.9, 0.99, 0.999},
"uniform");
}
}
@Test
public void testGamma() {
// this Gamma distribution is very heavily skewed. The 0.1%-ile is 6.07e-30 while
// the median is 0.006 and the 99.9th %-ile is 33.6 while the mean is 1.
// this severe skew means that we have to have positional accuracy that
// varies by over 11 orders of magnitude.
Random gen = getRandom();
for (int i = 0; i < 10; i++) {
runTest(new Gamma(0.1, 0.1, gen), 100,
// new double[]{6.0730483624079e-30, 6.0730483624079e-20, 6.0730483627432e-10, 5.9339110446023e-03,
// 2.6615455373884e+00, 1.5884778179295e+01, 3.3636770117188e+01},
new double[]{0.001, 0.01, 0.1, 0.5, 0.9, 0.99, 0.999},
"gamma");
}
}
@Test
public void testNarrowNormal() {
// this mixture of a uniform and normal distribution has a very narrow peak which is centered
// near the median. Our system should be scale invariant and work well regardless.
final Random gen = getRandom();
AbstractContinousDistribution mix = new AbstractContinousDistribution() {
AbstractContinousDistribution normal = new Normal(0, 1e-5, gen);
AbstractContinousDistribution uniform = new Uniform(-1, 1, gen);
@Override
public double nextDouble() {
double x;
if (gen.nextDouble() < 0.5) {
x = uniform.nextDouble();
} else {
x = normal.nextDouble();
}
return x;
}
};
for (int i = 0; i < 10; i++) {
runTest(mix, 100, new double[]{0.001, 0.01, 0.1, 0.3, 0.5, 0.7, 0.9, 0.99, 0.999}, "mixture");
}
}
@Test
public void testRepeatedValues() {
final Random gen = getRandom();
// 5% of samples will be 0 or 1.0. 10% for each of the values 0.1 through 0.9
AbstractContinousDistribution mix = new AbstractContinousDistribution() {
@Override
public double nextDouble() {
return Math.rint(gen.nextDouble() * 10) / 10.0;
}
};
TDigestState dist = new TDigestState((double) 1000);
List<Double> data = Lists.newArrayList();
for (int i1 = 0; i1 < 100000; i1++) {
double x = mix.nextDouble();
data.add(x);
dist.add(x);
}
// I would be happier with 5x compression, but repeated values make things kind of weird
assertTrue("Summary is too large", dist.centroidCount() < 10 * (double) 1000);
// all quantiles should round to nearest actual value
for (int i = 0; i < 10; i++) {
double z = i / 10.0;
// we skip over troublesome points that are nearly halfway between
for (double delta : new double[] {0.01, 0.02, 0.03, 0.07, 0.08, 0.09}) {
double q = z + delta;
double cdf = dist.cdf(q);
// we also relax the tolerances for repeated values
assertEquals(String.format(Locale.ROOT, "z=%.1f, q = %.3f, cdf = %.3f", z, q, cdf), z + 0.05, cdf, 0.01);
double estimate = dist.quantile(q);
assertEquals(String.format(Locale.ROOT, "z=%.1f, q = %.3f, cdf = %.3f, estimate = %.3f", z, q, cdf, estimate), Math.rint(q * 10) / 10.0, estimate, 0.001);
}
}
}
@Test
public void testSequentialPoints() {
for (int i = 0; i < 10; i++) {
runTest(new AbstractContinousDistribution() {
double base = 0;
@Override
public double nextDouble() {
base += Math.PI * 1e-5;
return base;
}
}, 100, new double[]{0.001, 0.01, 0.1, 0.5, 0.9, 0.99, 0.999},
"sequential");
}
}
public void runTest(AbstractContinousDistribution gen, double sizeGuide, double[] qValues, String tag) {
final int len = 100000;
final TDigestState dist = new TDigestState(sizeGuide);
double[] data = new double[len];
for (int i = 0; i < len; ++i) {
double x = gen.nextDouble();
data[i] = x;
dist.add(x);
}
dist.compress();
Arrays.sort(data);
double[] xValues = qValues.clone();
for (int i = 0; i < qValues.length; i++) {
double ix = data.length * qValues[i] - 0.5;
int index = (int) Math.floor(ix);
double p = ix - index;
xValues[i] = data[index] * (1 - p) + data[index + 1] * p;
}
assertThat("Summary is too large", (double) dist.centroidCount(), lessThan(sizeGuide * 10));
int softErrors = 0;
for (int i = 0; i < xValues.length; i++) {
double x = xValues[i];
double q = qValues[i];
double estimate = dist.cdf(x);
assertEquals(q, estimate, 0.005);
estimate = cdf(dist.quantile(q), data);
if (Math.abs(q - estimate) > 0.005) {
softErrors++;
}
assertEquals(q, estimate, 0.012);
}
assertTrue(softErrors < 3);
}
private double cdf(final double x, double[] data) {
int n1 = 0;
int n2 = 0;
for (double v : data) {
n1 += (v < x) ? 1 : 0;
n2 += (v <= x) ? 1 : 0;
}
return (n1 + n2) / 2.0 / data.length;
}
}