Aggregations: Adds serialisation of sigma to extended_stats_bucket pipeline aggregation

Previously the sigma variable in the `extended_stats_bucket` pipeline aggregation was not being serialised in `ExtendedStatsBucketPipelineAggregator`. This PR fixes that.

It also corrects the initial value of sumOfSquares to be 0.

Closes #17701
This commit is contained in:
Colin Goodheart-Smithe 2016-04-13 11:05:43 +01:00
parent 86a9365c53
commit f7ef600644
4 changed files with 89 additions and 14 deletions

View File

@ -111,15 +111,23 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
protected abstract void collectBucketValue(String bucketKey, Double bucketValue);
@Override
public void doReadFrom(StreamInput in) throws IOException {
public final void doReadFrom(StreamInput in) throws IOException {
format = in.readValueFormat();
gapPolicy = GapPolicy.readFrom(in);
innerReadFrom(in);
}
protected void innerReadFrom(StreamInput in) throws IOException {
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
public final void doWriteTo(StreamOutput out) throws IOException {
out.writeValueFormat(format);
gapPolicy.writeTo(out);
innerWriteTo(out);
}
protected void innerWriteTo(StreamOutput out) throws IOException {
}
}

View File

@ -107,14 +107,12 @@ public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAg
}
@Override
public void doReadFrom(StreamInput in) throws IOException {
super.doReadFrom(in);
public void innerReadFrom(StreamInput in) throws IOException {
percents = in.readDoubleArray();
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
super.doWriteTo(out);
public void innerWriteTo(StreamOutput out) throws IOException {
out.writeDoubleArray(percents);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
@ -78,7 +79,7 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline
count = 0;
min = Double.POSITIVE_INFINITY;
max = Double.NEGATIVE_INFINITY;
sumOfSqrs = 1;
sumOfSqrs = 0;
}
@Override
@ -95,4 +96,13 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline
return new InternalExtendedStatsBucket(name(), count, sum, min, max, sumOfSqrs, sigma, format, pipelineAggregators, metadata);
}
@Override
protected void innerReadFrom(StreamInput in) throws IOException {
sigma = in.readDouble();
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeDouble(sigma);
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats.Bounds;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucket;
@ -64,7 +65,7 @@ public class ExtendedStatsBucketIT extends ESIntegTestCase {
public void setupSuiteScopeCluster() throws Exception {
assertAcked(client().admin().indices().prepareCreate("idx")
.addMapping("type", "tag", "type=keyword").get());
createIndex("idx_unmapped");
createIndex("idx_unmapped", "idx_gappy");
numDocs = randomIntBetween(6, 20);
interval = randomIntBetween(2, 5);
@ -86,6 +87,13 @@ public class ExtendedStatsBucketIT extends ESIntegTestCase {
valueCounts[bucket]++;
}
for (int i = 0; i < 6; i++) {
// creates 6 documents where the value of the field is 0, 1, 2, 3,
// 3, 5
builders.add(client().prepareIndex("idx_gappy", "type", "" + i).setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i == 4 ? 3 : i).endObject()));
}
assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer"));
for (int i = 0; i < 2; i++) {
builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
@ -95,6 +103,57 @@ public class ExtendedStatsBucketIT extends ESIntegTestCase {
ensureSearchable();
}
/**
* Test for https://github.com/elastic/elasticsearch/issues/17701
*/
public void testGappyIndexWithSigma() {
double sigma = randomDoubleBetween(1.0, 6.0, true);
SearchResponse response = client().prepareSearch("idx_gappy")
.addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(1L))
.addAggregation(extendedStatsBucket("extended_stats_bucket", "histo>_count").sigma(sigma)).execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(6));
for (int i = 0; i < 6; ++i) {
long expectedDocCount;
if (i == 3) {
expectedDocCount = 2;
} else if (i == 4) {
expectedDocCount = 0;
} else {
expectedDocCount = 1;
}
Histogram.Bucket bucket = buckets.get(i);
assertThat("i: " + i, bucket, notNullValue());
assertThat("i: " + i, ((Number) bucket.getKey()).longValue(), equalTo((long) i));
assertThat("i: " + i, bucket.getDocCount(), equalTo(expectedDocCount));
}
ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket");
long count = 6L;
double sum = 1.0 + 1.0 + 1.0 + 2.0 + 0.0 + 1.0;
double sumOfSqrs = 1.0 + 1.0 + 1.0 + 4.0 + 0.0 + 1.0;
double avg = sum / count;
double var = (sumOfSqrs - ((sum * sum) / count)) / count;
double stdDev = Math.sqrt(var);
assertThat(extendedStatsBucketValue, notNullValue());
assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket"));
assertThat(extendedStatsBucketValue.getMin(), equalTo(0.0));
assertThat(extendedStatsBucketValue.getMax(), equalTo(2.0));
assertThat(extendedStatsBucketValue.getCount(), equalTo(count));
assertThat(extendedStatsBucketValue.getSum(), equalTo(sum));
assertThat(extendedStatsBucketValue.getAvg(), equalTo(avg));
assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSqrs));
assertThat(extendedStatsBucketValue.getVariance(), equalTo(var));
assertThat(extendedStatsBucketValue.getStdDeviation(), equalTo(stdDev));
assertThat(extendedStatsBucketValue.getStdDeviationBound(Bounds.LOWER), equalTo(avg - (sigma * stdDev)));
assertThat(extendedStatsBucketValue.getStdDeviationBound(Bounds.UPPER), equalTo(avg + (sigma * stdDev)));
}
public void testDocCountTopLevel() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
@ -113,7 +172,7 @@ public class ExtendedStatsBucketIT extends ESIntegTestCase {
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
double sumOfSquares = 0;
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
@ -170,7 +229,7 @@ public class ExtendedStatsBucketIT extends ESIntegTestCase {
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
double sumOfSquares = 0;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
@ -211,7 +270,7 @@ public class ExtendedStatsBucketIT extends ESIntegTestCase {
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
double sumOfSquares = 0;
for (int i = 0; i < interval; ++i) {
Terms.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
@ -271,7 +330,7 @@ public class ExtendedStatsBucketIT extends ESIntegTestCase {
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
double sumOfSquares = 0;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
@ -334,7 +393,7 @@ public class ExtendedStatsBucketIT extends ESIntegTestCase {
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
double sumOfSquares = 0;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
@ -436,7 +495,7 @@ public class ExtendedStatsBucketIT extends ESIntegTestCase {
int aggTermsCount = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
double sumOfSquares = 0;
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());