From 397d5beae1878ad9329a32fe760c03d69f929de5 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 26 Aug 2015 14:20:35 -0400 Subject: [PATCH] Aggregations: Add stats_bucket / extended_stats_bucket pipeline aggregations These are the complements to the stats/extended_stats metric aggregations, and can be used to calculate a variety of statistics over buckets --- .../elasticsearch/search/SearchModule.java | 10 +- .../stats/extended/InternalExtendedStats.java | 2 +- .../pipeline/PipelineAggregatorBuilders.java | 10 + .../bucketmetrics/BucketMetricsParser.java | 2 +- .../stats/InternalStatsBucket.java | 68 +++ .../bucketmetrics/stats/StatsBucket.java | 29 ++ .../stats/StatsBucketBuilder.java | 30 ++ .../stats/StatsBucketParser.java | 40 ++ .../stats/StatsBucketPipelineAggregator.java | 122 +++++ .../stats/extended/ExtendedStatsBucket.java | 9 + .../extended/ExtendedStatsBucketBuilder.java | 47 ++ .../extended/ExtendedStatsBucketParser.java | 57 +++ ...ExtendedStatsBucketPipelineAggregator.java | 134 +++++ .../extended/InternalExtendedStatsBucket.java | 70 +++ .../pipeline/ExtendedStatsBucketIT.java | 478 ++++++++++++++++++ .../aggregations/pipeline/StatsBucketIT.java | 437 ++++++++++++++++ docs/reference/aggregations/pipeline.asciidoc | 2 + ...extended-stats-bucket-aggregation.asciidoc | 118 +++++ .../stats-bucket-aggregation.asciidoc | 108 ++++ 19 files changed, 1770 insertions(+), 3 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucket.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketParser.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucket.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketParser.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java create mode 100644 core/src/test/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketIT.java create mode 100644 core/src/test/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketIT.java create mode 100644 docs/reference/aggregations/pipeline/extended-stats-bucket-aggregation.asciidoc create mode 100644 docs/reference/aggregations/pipeline/stats-bucket-aggregation.asciidoc diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index b78fe06f8e9..419f6ec40e4 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -111,6 +111,10 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucke import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketParser; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketParser; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser; @@ -300,6 +304,8 @@ public class SearchModule extends AbstractModule { multibinderPipelineAggParser.addBinding().to(MinBucketParser.class); multibinderPipelineAggParser.addBinding().to(AvgBucketParser.class); multibinderPipelineAggParser.addBinding().to(SumBucketParser.class); + multibinderPipelineAggParser.addBinding().to(StatsBucketParser.class); + multibinderPipelineAggParser.addBinding().to(ExtendedStatsBucketParser.class); multibinderPipelineAggParser.addBinding().to(PercentilesBucketParser.class); multibinderPipelineAggParser.addBinding().to(MovAvgParser.class); multibinderPipelineAggParser.addBinding().to(CumulativeSumParser.class); @@ -393,7 +399,9 @@ public class SearchModule extends AbstractModule { MinBucketPipelineAggregator.registerStreams(); AvgBucketPipelineAggregator.registerStreams(); SumBucketPipelineAggregator.registerStreams(); - PercentilesBucketPipelineAggregator.registerStreams(); + StatsBucketPipelineAggregator.registerStreams(); + ExtendedStatsBucketPipelineAggregator.registerStreams(); + PercentilesBucketPipelineAggregator.registerStreams(); MovAvgPipelineAggregator.registerStreams(); CumulativeSumPipelineAggregator.registerStreams(); BucketScriptPipelineAggregator.registerStreams(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java index 7fa4f2ed7b6..543c5907070 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java @@ -65,7 +65,7 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat private double sumOfSqrs; private double sigma; - InternalExtendedStats() {} // for serialization + protected InternalExtendedStats() {} // for serialization public InternalExtendedStats(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma, ValueFormatter formatter, List pipelineAggregators, Map metaData) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java index 96df702072d..d80b087930c 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java @@ -22,6 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder; @@ -56,6 +58,14 @@ public final class PipelineAggregatorBuilders { return new SumBucketBuilder(name); } + public static final StatsBucketBuilder statsBucket(String name) { + return new StatsBucketBuilder(name); + } + + public static final ExtendedStatsBucketBuilder extendedStatsBucket(String name) { + return new ExtendedStatsBucketBuilder(name); + } + public static final PercentilesBucketBuilder percentilesBucket(String name) { return new PercentilesBucketBuilder(name); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java index f994d9314ae..287fb7b3402 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java @@ -117,4 +117,4 @@ public abstract class BucketMetricsParser implements PipelineAggregator.Parser { protected abstract PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter, Map unparsedParams) throws ParseException; -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java new file mode 100644 index 00000000000..fd326f50d50 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.metrics.stats.InternalStats; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class InternalStatsBucket extends InternalStats implements StatsBucket { + + public final static Type TYPE = new Type("stats_bucket"); + + public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalStatsBucket readResult(StreamInput in) throws IOException { + InternalStatsBucket result = new InternalStatsBucket(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + public InternalStatsBucket(String name, long count, double sum, double min, double max, ValueFormatter formatter, + List pipelineAggregators, Map metaData) { + super(name, count, sum, min, max, formatter, pipelineAggregators, metaData); + } + + InternalStatsBucket() { + // For serialization + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalStats doReduce(List aggregations, ReduceContext reduceContext) { + throw new UnsupportedOperationException("Not supported"); + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucket.java new file mode 100644 index 00000000000..0e158d2a122 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucket.java @@ -0,0 +1,29 @@ +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats; + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.elasticsearch.search.aggregations.metrics.stats.Stats; + +/** + * Statistics over a set of buckets + */ +public interface StatsBucket extends Stats { + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketBuilder.java new file mode 100644 index 00000000000..a8c19db8079 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketBuilder.java @@ -0,0 +1,30 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats; + +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsBuilder; + +public class StatsBucketBuilder extends BucketMetricsBuilder { + + public StatsBucketBuilder(String name) { + super(name, StatsBucketPipelineAggregator.TYPE.name()); + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketParser.java new file mode 100644 index 00000000000..b25044786ec --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketParser.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats; + +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; + +import java.util.Map; + +public class StatsBucketParser extends BucketMetricsParser { + @Override + public String type() { + return StatsBucketPipelineAggregator.TYPE.name(); + } + + @Override + protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy, + ValueFormatter formatter, Map unparsedParams) { + return new StatsBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter); + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java new file mode 100644 index 00000000000..66726cef29e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java @@ -0,0 +1,122 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator { + + public final static Type TYPE = new Type("stats_bucket"); + + public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { + @Override + public StatsBucketPipelineAggregator readResult(StreamInput in) throws IOException { + StatsBucketPipelineAggregator result = new StatsBucketPipelineAggregator(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); + InternalStatsBucket.registerStreams(); + } + + private double sum = 0; + private long count = 0; + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + + protected StatsBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter, + Map metaData) { + super(name, bucketsPaths, gapPolicy, formatter, metaData); + } + + StatsBucketPipelineAggregator() { + // For serialization + } + + @Override + public Type type() { + return TYPE; + } + + @Override + protected void preCollection() { + sum = 0; + count = 0; + min = Double.POSITIVE_INFINITY; + max = Double.NEGATIVE_INFINITY; + } + + @Override + protected void collectBucketValue(String bucketKey, Double bucketValue) { + sum += bucketValue; + min = Math.min(min, bucketValue); + max = Math.max(max, bucketValue); + count += 1; + } + + @Override + protected InternalAggregation buildAggregation(List pipelineAggregators, Map metadata) { + return new InternalStatsBucket(name(), count, sum, min, max, formatter, pipelineAggregators, metadata); + } + + public static class Factory extends PipelineAggregatorFactory { + + private final ValueFormatter formatter; + private final GapPolicy gapPolicy; + + public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter) { + super(name, TYPE.name(), bucketsPaths); + this.gapPolicy = gapPolicy; + this.formatter = formatter; + } + + @Override + protected PipelineAggregator createInternal(Map metaData) throws IOException { + return new StatsBucketPipelineAggregator(name, bucketsPaths, gapPolicy, formatter, metaData); + } + + @Override + public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, + List pipelineAggregatorFactories) { + if (bucketsPaths.length != 1) { + throw new IllegalStateException(Parser.BUCKETS_PATH.getPreferredName() + + " must contain a single entry for aggregation [" + name + "]"); + } + } + + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucket.java new file mode 100644 index 00000000000..426b5d7809a --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucket.java @@ -0,0 +1,9 @@ +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended; + +import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats; + +/** + * Extended Statistics over a set of buckets + */ +public interface ExtendedStatsBucket extends ExtendedStats { +} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketBuilder.java new file mode 100644 index 00000000000..25880bdcabe --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketBuilder.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsBuilder; + +import java.io.IOException; + +public class ExtendedStatsBucketBuilder extends BucketMetricsBuilder { + + Double sigma; + + public ExtendedStatsBucketBuilder(String name) { + super(name, ExtendedStatsBucketPipelineAggregator.TYPE.name()); + } + + public ExtendedStatsBucketBuilder sigma(Double sigma) { + this.sigma = sigma; + return this; + } + + @Override + protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException { + if (sigma != null) { + builder.field(ExtendedStatsBucketParser.SIGMA.getPreferredName(), sigma); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketParser.java new file mode 100644 index 00000000000..b4d1f18f7b4 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketParser.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; + +import java.text.ParseException; +import java.util.Map; + +public class ExtendedStatsBucketParser extends BucketMetricsParser { + static final ParseField SIGMA = new ParseField("sigma"); + + @Override + public String type() { + return ExtendedStatsBucketPipelineAggregator.TYPE.name(); + } + + @Override + protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy, + ValueFormatter formatter, Map unparsedParams) throws ParseException { + + double sigma = 2.0; + Object param = unparsedParams.get(SIGMA.getPreferredName()); + + if (param != null) { + if (param instanceof Double) { + sigma = (Double) param; + unparsedParams.remove(SIGMA.getPreferredName()); + } else { + throw new ParseException("Parameter [" + SIGMA.getPreferredName() + "] must be a Double, type `" + + param.getClass().getSimpleName() + "` provided instead", 0); + } + } + return new ExtendedStatsBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, sigma, gapPolicy, formatter); + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java new file mode 100644 index 00000000000..6a7f2bec3f0 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator { + + public final static Type TYPE = new Type("extended_stats_bucket"); + + public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { + @Override + public ExtendedStatsBucketPipelineAggregator readResult(StreamInput in) throws IOException { + ExtendedStatsBucketPipelineAggregator result = new ExtendedStatsBucketPipelineAggregator(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); + InternalExtendedStatsBucket.registerStreams(); + } + + private double sum = 0; + private long count = 0; + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + private double sumOfSqrs = 1; + private double sigma; + + protected ExtendedStatsBucketPipelineAggregator(String name, String[] bucketsPaths, double sigma, GapPolicy gapPolicy, + ValueFormatter formatter, Map metaData) { + super(name, bucketsPaths, gapPolicy, formatter, metaData); + this.sigma = sigma; + } + + ExtendedStatsBucketPipelineAggregator() { + // For Serialization + } + + @Override + public Type type() { + return TYPE; + } + + @Override + protected void preCollection() { + sum = 0; + count = 0; + min = Double.POSITIVE_INFINITY; + max = Double.NEGATIVE_INFINITY; + sumOfSqrs = 1; + } + + @Override + protected void collectBucketValue(String bucketKey, Double bucketValue) { + sum += bucketValue; + min = Math.min(min, bucketValue); + max = Math.max(max, bucketValue); + count += 1; + sumOfSqrs += bucketValue * bucketValue; + } + + @Override + protected InternalAggregation buildAggregation(List pipelineAggregators, Map metadata) { + return new InternalExtendedStatsBucket(name(), count, sum, min, max, sumOfSqrs, sigma, formatter, pipelineAggregators, metadata); + } + + public static class Factory extends PipelineAggregatorFactory { + + private final ValueFormatter formatter; + private final GapPolicy gapPolicy; + private final double sigma; + + public Factory(String name, String[] bucketsPaths, double sigma, GapPolicy gapPolicy, ValueFormatter formatter) { + super(name, TYPE.name(), bucketsPaths); + this.gapPolicy = gapPolicy; + this.formatter = formatter; + this.sigma = sigma; + } + + @Override + protected PipelineAggregator createInternal(Map metaData) throws IOException { + return new ExtendedStatsBucketPipelineAggregator(name, bucketsPaths, sigma, gapPolicy, formatter, metaData); + } + + @Override + public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, + List pipelineAggregatorFactories) { + if (bucketsPaths.length != 1) { + throw new IllegalStateException(Parser.BUCKETS_PATH.getPreferredName() + + " must contain a single entry for aggregation [" + name + "]"); + } + + if (sigma < 0.0 ) { + throw new IllegalStateException(ExtendedStatsBucketParser.SIGMA.getPreferredName() + + " must be a non-negative double"); + } + } + + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java new file mode 100644 index 00000000000..469b184bcc7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class InternalExtendedStatsBucket extends InternalExtendedStats implements ExtendedStatsBucket { + + public final static Type TYPE = new Type("extended_stats_bucket"); + + public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalExtendedStatsBucket readResult(StreamInput in) throws IOException { + InternalExtendedStatsBucket result = new InternalExtendedStatsBucket(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + InternalExtendedStatsBucket(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma, + ValueFormatter formatter, List pipelineAggregators, + Map metaData) { + super(name, count, sum, min, max, sumOfSqrs, sigma, formatter, pipelineAggregators, metaData); + } + + InternalExtendedStatsBucket() { + // for serialization + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats doReduce( + List aggregations, ReduceContext reduceContext) { + throw new UnsupportedOperationException("Not supported"); + } +} diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketIT.java new file mode 100644 index 00000000000..3c3d705ecc0 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketIT.java @@ -0,0 +1,478 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucket; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.extendedStatsBucket; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.IsNull.notNullValue; + +@ESIntegTestCase.SuiteScopeTestCase +public class ExtendedStatsBucketIT extends ESIntegTestCase { + + private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; + + static int numDocs; + static int interval; + static int minRandomValue; + static int maxRandomValue; + static int numValueBuckets; + static long[] valueCounts; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + + numDocs = randomIntBetween(6, 20); + interval = randomIntBetween(2, 5); + + minRandomValue = 0; + maxRandomValue = 20; + + numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; + valueCounts = new long[numValueBuckets]; + + List builders = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); + builders.add(client().prepareIndex("idx", "type").setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval)) + .endObject())); + final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); + valueCounts[bucket]++; + } + + assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); + for (int i = 0; i < 2; i++) { + builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject())); + } + indexRandom(true, builders); + ensureSearchable(); + } + + @Test + public void testDocCount_topLevel() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .addAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("histo>_count")).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + double sum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sumOfSquares = 1; + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); + assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); + count++; + sum += bucket.getDocCount(); + min = Math.min(min, bucket.getDocCount()); + max = Math.max(max, bucket.getDocCount()); + sumOfSquares += bucket.getDocCount() * bucket.getDocCount(); + } + + double avgValue = count == 0 ? Double.NaN : (sum / count); + ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket"); + assertThat(extendedStatsBucketValue, notNullValue()); + assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); + assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); + assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); + assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); + } + + @Test + public void testDocCount_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("histo>_count"))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double sum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sumOfSquares = 1; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + count++; + sum += bucket.getDocCount(); + min = Math.min(min, bucket.getDocCount()); + max = Math.max(max, bucket.getDocCount()); + sumOfSquares += bucket.getDocCount() * bucket.getDocCount(); + } + + double avgValue = count == 0 ? Double.NaN : (sum / count); + ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("extended_stats_bucket"); + assertThat(extendedStatsBucketValue, notNullValue()); + assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); + assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); + assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); + assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); + } + } + + @Test + public void testMetric_topLevel() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("terms>sum")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(interval)); + + double bucketSum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sumOfSquares = 1; + for (int i = 0; i < interval; ++i) { + Terms.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); + assertThat(bucket.getDocCount(), greaterThan(0l)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + count++; + bucketSum += sum.value(); + min = Math.min(min, sum.value()); + max = Math.max(max, sum.value()); + sumOfSquares += sum.value() * sum.value(); + } + + double avgValue = count == 0 ? Double.NaN : (bucketSum / count); + ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket"); + assertThat(extendedStatsBucketValue, notNullValue()); + assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); + assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); + assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); + assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); + } + + @Test + public void testMetric_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("histo>sum"))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double bucketSum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sumOfSquares = 1; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + if (bucket.getDocCount() != 0) { + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + count++; + bucketSum += sum.value(); + min = Math.min(min, sum.value()); + max = Math.max(max, sum.value()); + sumOfSquares += sum.value() * sum.value(); + } + } + + double avgValue = count == 0 ? Double.NaN : (bucketSum / count); + ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("extended_stats_bucket"); + assertThat(extendedStatsBucketValue, notNullValue()); + assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); + assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); + assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); + assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); + } + } + + @Test + public void testMetric_asSubAggWithInsertZeros() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS))) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double bucketSum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sumOfSquares = 1; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + + count++; + bucketSum += sum.value(); + min = Math.min(min, sum.value()); + max = Math.max(max, sum.value()); + sumOfSquares += sum.value() * sum.value(); + } + + double avgValue = count == 0 ? Double.NaN : (bucketSum / count); + ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("extended_stats_bucket"); + assertThat(extendedStatsBucketValue, notNullValue()); + assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); + assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); + assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); + assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); + } + } + + @Test + public void testNoBuckets() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("terms>sum")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket"); + assertThat(extendedStatsBucketValue, notNullValue()); + assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); + assertThat(extendedStatsBucketValue.getAvg(), equalTo(Double.NaN)); + } + + @Test + public void testBadSigma_asSubAgg() throws Exception { + try { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(extendedStatsBucket("extended_stats_bucket") + .setBucketsPaths("histo>sum").sigma(-1.0))).execute().actionGet(); + fail("Illegal sigma was provided but no exception was thrown."); + } catch (SearchPhaseExecutionException exception) { + // All good + } + } + + @Test + public void testNested() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(extendedStatsBucket("avg_histo_bucket").setBucketsPaths("histo>_count"))) + .addAggregation(extendedStatsBucket("avg_terms_bucket").setBucketsPaths("terms>avg_histo_bucket.avg")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + double aggTermsSum = 0; + int aggTermsCount = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sumOfSquares = 1; + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double aggHistoSum = 0; + int aggHistoCount = 0; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + + aggHistoCount++; + aggHistoSum += bucket.getDocCount(); + } + + double avgHistoValue = aggHistoCount == 0 ? Double.NaN : (aggHistoSum / aggHistoCount); + ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("avg_histo_bucket"); + assertThat(extendedStatsBucketValue, notNullValue()); + assertThat(extendedStatsBucketValue.getName(), equalTo("avg_histo_bucket")); + assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgHistoValue)); + + + aggTermsCount++; + aggTermsSum += avgHistoValue; + min = Math.min(min, avgHistoValue); + max = Math.max(max, avgHistoValue); + sumOfSquares += avgHistoValue * avgHistoValue; + } + + double avgTermsValue = aggTermsCount == 0 ? Double.NaN : (aggTermsSum / aggTermsCount); + ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("avg_terms_bucket"); + assertThat(extendedStatsBucketValue, notNullValue()); + assertThat(extendedStatsBucketValue.getName(), equalTo("avg_terms_bucket")); + assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgTermsValue)); + assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); + assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); + assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); + } +} diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketIT.java new file mode 100644 index 00000000000..866fdc07738 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketIT.java @@ -0,0 +1,437 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucket; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.statsBucket; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.IsNull.notNullValue; + +@ESIntegTestCase.SuiteScopeTestCase +public class StatsBucketIT extends ESIntegTestCase { + + private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; + + static int numDocs; + static int interval; + static int minRandomValue; + static int maxRandomValue; + static int numValueBuckets; + static long[] valueCounts; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + + numDocs = randomIntBetween(6, 20); + interval = randomIntBetween(2, 5); + + minRandomValue = 0; + maxRandomValue = 20; + + numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; + valueCounts = new long[numValueBuckets]; + + List builders = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); + builders.add(client().prepareIndex("idx", "type").setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval)) + .endObject())); + final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); + valueCounts[bucket]++; + } + + assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); + for (int i = 0; i < 2; i++) { + builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject())); + } + indexRandom(true, builders); + ensureSearchable(); + } + + @Test + public void testDocCount_topLevel() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .addAggregation(statsBucket("stats_bucket").setBucketsPaths("histo>_count")).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + double sum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); + assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); + count++; + sum += bucket.getDocCount(); + min = Math.min(min, bucket.getDocCount()); + max = Math.max(max, bucket.getDocCount()); + } + + double avgValue = count == 0 ? Double.NaN : (sum / count); + StatsBucket statsBucketValue = response.getAggregations().get("stats_bucket"); + assertThat(statsBucketValue, notNullValue()); + assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); + assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(statsBucketValue.getMin(), equalTo(min)); + assertThat(statsBucketValue.getMax(), equalTo(max)); + } + + @Test + public void testDocCount_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(statsBucket("stats_bucket").setBucketsPaths("histo>_count"))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double sum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + count++; + sum += bucket.getDocCount(); + min = Math.min(min, bucket.getDocCount()); + max = Math.max(max, bucket.getDocCount()); + } + + double avgValue = count == 0 ? Double.NaN : (sum / count); + StatsBucket statsBucketValue = termsBucket.getAggregations().get("stats_bucket"); + assertThat(statsBucketValue, notNullValue()); + assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); + assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(statsBucketValue.getMin(), equalTo(min)); + assertThat(statsBucketValue.getMax(), equalTo(max)); + } + } + + @Test + public void testMetric_topLevel() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(statsBucket("stats_bucket").setBucketsPaths("terms>sum")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(interval)); + + double bucketSum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int i = 0; i < interval; ++i) { + Terms.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); + assertThat(bucket.getDocCount(), greaterThan(0l)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + count++; + bucketSum += sum.value(); + min = Math.min(min, sum.value()); + max = Math.max(max, sum.value()); + } + + double avgValue = count == 0 ? Double.NaN : (bucketSum / count); + StatsBucket statsBucketValue = response.getAggregations().get("stats_bucket"); + assertThat(statsBucketValue, notNullValue()); + assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); + assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(statsBucketValue.getMin(), equalTo(min)); + assertThat(statsBucketValue.getMax(), equalTo(max)); + } + + @Test + public void testMetric_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(statsBucket("stats_bucket").setBucketsPaths("histo>sum"))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double bucketSum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + if (bucket.getDocCount() != 0) { + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + count++; + bucketSum += sum.value(); + min = Math.min(min, sum.value()); + max = Math.max(max, sum.value()); + } + } + + double avgValue = count == 0 ? Double.NaN : (bucketSum / count); + StatsBucket statsBucketValue = termsBucket.getAggregations().get("stats_bucket"); + assertThat(statsBucketValue, notNullValue()); + assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); + assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(statsBucketValue.getMin(), equalTo(min)); + assertThat(statsBucketValue.getMax(), equalTo(max)); + } + } + + @Test + public void testMetric_asSubAggWithInsertZeros() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(statsBucket("stats_bucket").setBucketsPaths("histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS))) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double bucketSum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + + count++; + bucketSum += sum.value(); + min = Math.min(min, sum.value()); + max = Math.max(max, sum.value()); + } + + double avgValue = count == 0 ? Double.NaN : (bucketSum / count); + StatsBucket statsBucketValue = termsBucket.getAggregations().get("stats_bucket"); + assertThat(statsBucketValue, notNullValue()); + assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); + assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); + assertThat(statsBucketValue.getMin(), equalTo(min)); + assertThat(statsBucketValue.getMax(), equalTo(max)); + } + } + + @Test + public void testNoBuckets() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(statsBucket("stats_bucket").setBucketsPaths("terms>sum")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + StatsBucket statsBucketValue = response.getAggregations().get("stats_bucket"); + assertThat(statsBucketValue, notNullValue()); + assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); + assertThat(statsBucketValue.getAvg(), equalTo(Double.NaN)); + } + + @Test + public void testNested() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(statsBucket("avg_histo_bucket").setBucketsPaths("histo>_count"))) + .addAggregation(statsBucket("avg_terms_bucket").setBucketsPaths("terms>avg_histo_bucket.avg")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + double aggTermsSum = 0; + int aggTermsCount = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double aggHistoSum = 0; + int aggHistoCount = 0; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + + aggHistoCount++; + aggHistoSum += bucket.getDocCount(); + } + + double avgHistoValue = aggHistoCount == 0 ? Double.NaN : (aggHistoSum / aggHistoCount); + StatsBucket statsBucketValue = termsBucket.getAggregations().get("avg_histo_bucket"); + assertThat(statsBucketValue, notNullValue()); + assertThat(statsBucketValue.getName(), equalTo("avg_histo_bucket")); + assertThat(statsBucketValue.getAvg(), equalTo(avgHistoValue)); + + aggTermsCount++; + aggTermsSum += avgHistoValue; + min = Math.min(min, avgHistoValue); + max = Math.max(max, avgHistoValue); + } + + double avgTermsValue = aggTermsCount == 0 ? Double.NaN : (aggTermsSum / aggTermsCount); + StatsBucket statsBucketValue = response.getAggregations().get("avg_terms_bucket"); + assertThat(statsBucketValue, notNullValue()); + assertThat(statsBucketValue.getName(), equalTo("avg_terms_bucket")); + assertThat(statsBucketValue.getAvg(), equalTo(avgTermsValue)); + assertThat(statsBucketValue.getMin(), equalTo(min)); + assertThat(statsBucketValue.getMax(), equalTo(max)); + } +} diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index b6a1073156a..4410db3a798 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -163,6 +163,8 @@ include::pipeline/derivative-aggregation.asciidoc[] include::pipeline/max-bucket-aggregation.asciidoc[] include::pipeline/min-bucket-aggregation.asciidoc[] include::pipeline/sum-bucket-aggregation.asciidoc[] +include::pipeline/stats-bucket-aggregation.asciidoc[] +include::pipeline/extended-stats-bucket-aggregation.asciidoc[] include::pipeline/percentiles-bucket-aggregation.asciidoc[] include::pipeline/movavg-aggregation.asciidoc[] include::pipeline/cumulative-sum-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/extended-stats-bucket-aggregation.asciidoc b/docs/reference/aggregations/pipeline/extended-stats-bucket-aggregation.asciidoc new file mode 100644 index 00000000000..bbf610ac8ab --- /dev/null +++ b/docs/reference/aggregations/pipeline/extended-stats-bucket-aggregation.asciidoc @@ -0,0 +1,118 @@ +[[search-aggregations-pipeline-extended-stats-bucket-aggregation]] +=== Extended Stats Bucket Aggregation + +coming[2.1.0] + +experimental[] + +A sibling pipeline aggregation which calculates a variety of stats across all bucket of a specified metric in a sibling aggregation. +The specified metric must be numeric and the sibling aggregation must be a multi-bucket aggregation. + +This aggregation provides a few more statistics (sum of squares, standard deviation, etc) compared to the `stats_bucket` aggregation. + +==== Syntax + +A `extended_stats_bucket` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "extended_stats_bucket": { + "buckets_path": "the_sum" + } +} +-------------------------------------------------- + +.`extended_stats_bucket` Parameters +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |The path to the buckets we wish to calculate stats for (see <> for more + details) |Required | +|`gap_policy` |The policy to apply when gaps are found in the data (see <> for more + details)|Optional | `skip` +|`format` |format to apply to the output value of this aggregation |Optional | `null` +|`sigma` |The number of standard deviations above/below the mean to display |Optional | 2 +|=== + +The following snippet calculates the sum of all the total monthly `sales` buckets: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "interval" : "month" + }, + "aggs": { + "sales": { + "sum": { + "field": "price" + } + } + } + }, + "stats_monthly_sales": { + "extended_stats_bucket": { + "buckets_paths": "sales_per_month>sales" <1> + } + } + } +} +-------------------------------------------------- +<1> `bucket_paths` instructs this `extended_stats_bucket` aggregation that we want the calculate stats for the `sales` aggregation in the +`sales_per_month` date histogram. + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "sales": { + "value": 550 + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "sales": { + "value": 60 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "sales": { + "value": 375 + } + } + ] + }, + "stats_monthly_sales": { + "count": 3, + "min": 60, + "max": 550, + "avg": 328.333333333, + "sum": 985, + "sum_of_squares": 446725, + "variance": 41105.5555556, + "std_deviation": 117.054909559, + "std_deviation_bounds": { + "upper": 562.443152451, + "lower": 94.2235142151 + } + } + } +} +-------------------------------------------------- + diff --git a/docs/reference/aggregations/pipeline/stats-bucket-aggregation.asciidoc b/docs/reference/aggregations/pipeline/stats-bucket-aggregation.asciidoc new file mode 100644 index 00000000000..7d6d24dda67 --- /dev/null +++ b/docs/reference/aggregations/pipeline/stats-bucket-aggregation.asciidoc @@ -0,0 +1,108 @@ +[[search-aggregations-pipeline-stats-bucket-aggregation]] +=== Stats Bucket Aggregation + +coming[2.1.0] + +experimental[] + +A sibling pipeline aggregation which calculates a variety of stats across all bucket of a specified metric in a sibling aggregation. +The specified metric must be numeric and the sibling aggregation must be a multi-bucket aggregation. + +==== Syntax + +A `stats_bucket` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "stats_bucket": { + "buckets_path": "the_sum" + } +} +-------------------------------------------------- + +.`stats_bucket` Parameters +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |The path to the buckets we wish to calculate stats for (see <> for more + details) |Required | +|`gap_policy` |The policy to apply when gaps are found in the data (see <> for more + details)|Optional | `skip` +|`format` |format to apply to the output value of this aggregation |Optional | `null` +|=== + +The following snippet calculates the sum of all the total monthly `sales` buckets: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "interval" : "month" + }, + "aggs": { + "sales": { + "sum": { + "field": "price" + } + } + } + }, + "stats_monthly_sales": { + "stats_bucket": { + "buckets_paths": "sales_per_month>sales" <1> + } + } + } +} +-------------------------------------------------- +<1> `bucket_paths` instructs this `stats_bucket` aggregation that we want the calculate stats for the `sales` aggregation in the +`sales_per_month` date histogram. + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "sales": { + "value": 550 + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "sales": { + "value": 60 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "sales": { + "value": 375 + } + } + ] + }, + "stats_monthly_sales": { + "count": 3, + "min": 60, + "max": 550, + "avg": 328.333333333, + "sum": 985 + } + } +} +-------------------------------------------------- +