Aggregations: Add stats_bucket / extended_stats_bucket pipeline aggregations

These are the complements to the stats/extended_stats metric aggregations, and can be used
to calculate a variety of statistics over buckets
This commit is contained in:
Zachary Tong 2015-08-26 14:20:35 -04:00
parent 242c8c0465
commit 397d5beae1
19 changed files with 1770 additions and 3 deletions

View File

@ -111,6 +111,10 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucke
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketParser;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketParser;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketParser;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketParser;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser;
@ -300,6 +304,8 @@ public class SearchModule extends AbstractModule {
multibinderPipelineAggParser.addBinding().to(MinBucketParser.class); multibinderPipelineAggParser.addBinding().to(MinBucketParser.class);
multibinderPipelineAggParser.addBinding().to(AvgBucketParser.class); multibinderPipelineAggParser.addBinding().to(AvgBucketParser.class);
multibinderPipelineAggParser.addBinding().to(SumBucketParser.class); multibinderPipelineAggParser.addBinding().to(SumBucketParser.class);
multibinderPipelineAggParser.addBinding().to(StatsBucketParser.class);
multibinderPipelineAggParser.addBinding().to(ExtendedStatsBucketParser.class);
multibinderPipelineAggParser.addBinding().to(PercentilesBucketParser.class); multibinderPipelineAggParser.addBinding().to(PercentilesBucketParser.class);
multibinderPipelineAggParser.addBinding().to(MovAvgParser.class); multibinderPipelineAggParser.addBinding().to(MovAvgParser.class);
multibinderPipelineAggParser.addBinding().to(CumulativeSumParser.class); multibinderPipelineAggParser.addBinding().to(CumulativeSumParser.class);
@ -393,7 +399,9 @@ public class SearchModule extends AbstractModule {
MinBucketPipelineAggregator.registerStreams(); MinBucketPipelineAggregator.registerStreams();
AvgBucketPipelineAggregator.registerStreams(); AvgBucketPipelineAggregator.registerStreams();
SumBucketPipelineAggregator.registerStreams(); SumBucketPipelineAggregator.registerStreams();
PercentilesBucketPipelineAggregator.registerStreams(); StatsBucketPipelineAggregator.registerStreams();
ExtendedStatsBucketPipelineAggregator.registerStreams();
PercentilesBucketPipelineAggregator.registerStreams();
MovAvgPipelineAggregator.registerStreams(); MovAvgPipelineAggregator.registerStreams();
CumulativeSumPipelineAggregator.registerStreams(); CumulativeSumPipelineAggregator.registerStreams();
BucketScriptPipelineAggregator.registerStreams(); BucketScriptPipelineAggregator.registerStreams();

View File

@ -65,7 +65,7 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
private double sumOfSqrs; private double sumOfSqrs;
private double sigma; private double sigma;
InternalExtendedStats() {} // for serialization protected InternalExtendedStats() {} // for serialization
public InternalExtendedStats(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma, public InternalExtendedStats(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma,
ValueFormatter formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { ValueFormatter formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {

View File

@ -22,6 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder; import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder;
@ -56,6 +58,14 @@ public final class PipelineAggregatorBuilders {
return new SumBucketBuilder(name); return new SumBucketBuilder(name);
} }
public static final StatsBucketBuilder statsBucket(String name) {
return new StatsBucketBuilder(name);
}
public static final ExtendedStatsBucketBuilder extendedStatsBucket(String name) {
return new ExtendedStatsBucketBuilder(name);
}
public static final PercentilesBucketBuilder percentilesBucket(String name) { public static final PercentilesBucketBuilder percentilesBucket(String name) {
return new PercentilesBucketBuilder(name); return new PercentilesBucketBuilder(name);
} }

View File

@ -117,4 +117,4 @@ public abstract class BucketMetricsParser implements PipelineAggregator.Parser {
protected abstract PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy, protected abstract PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
ValueFormatter formatter, Map<String, Object> unparsedParams) throws ParseException; ValueFormatter formatter, Map<String, Object> unparsedParams) throws ParseException;
} }

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.stats.InternalStats;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class InternalStatsBucket extends InternalStats implements StatsBucket {
public final static Type TYPE = new Type("stats_bucket");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalStatsBucket readResult(StreamInput in) throws IOException {
InternalStatsBucket result = new InternalStatsBucket();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
public InternalStatsBucket(String name, long count, double sum, double min, double max, ValueFormatter formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, count, sum, min, max, formatter, pipelineAggregators, metaData);
}
InternalStatsBucket() {
// For serialization
}
@Override
public Type type() {
return TYPE;
}
@Override
public InternalStats doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}
}

View File

@ -0,0 +1,29 @@
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.search.aggregations.metrics.stats.Stats;
/**
* Statistics over a set of buckets
*/
public interface StatsBucket extends Stats {
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsBuilder;
public class StatsBucketBuilder extends BucketMetricsBuilder<StatsBucketBuilder> {
public StatsBucketBuilder(String name) {
super(name, StatsBucketPipelineAggregator.TYPE.name());
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.util.Map;
public class StatsBucketParser extends BucketMetricsParser {
@Override
public String type() {
return StatsBucketPipelineAggregator.TYPE.name();
}
@Override
protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
ValueFormatter formatter, Map<String, Object> unparsedParams) {
return new StatsBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter);
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
public final static Type TYPE = new Type("stats_bucket");
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public StatsBucketPipelineAggregator readResult(StreamInput in) throws IOException {
StatsBucketPipelineAggregator result = new StatsBucketPipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
InternalStatsBucket.registerStreams();
}
private double sum = 0;
private long count = 0;
private double min = Double.POSITIVE_INFINITY;
private double max = Double.NEGATIVE_INFINITY;
protected StatsBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter,
Map<String, Object> metaData) {
super(name, bucketsPaths, gapPolicy, formatter, metaData);
}
StatsBucketPipelineAggregator() {
// For serialization
}
@Override
public Type type() {
return TYPE;
}
@Override
protected void preCollection() {
sum = 0;
count = 0;
min = Double.POSITIVE_INFINITY;
max = Double.NEGATIVE_INFINITY;
}
@Override
protected void collectBucketValue(String bucketKey, Double bucketValue) {
sum += bucketValue;
min = Math.min(min, bucketValue);
max = Math.max(max, bucketValue);
count += 1;
}
@Override
protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
return new InternalStatsBucket(name(), count, sum, min, max, formatter, pipelineAggregators, metadata);
}
public static class Factory extends PipelineAggregatorFactory {
private final ValueFormatter formatter;
private final GapPolicy gapPolicy;
public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter) {
super(name, TYPE.name(), bucketsPaths);
this.gapPolicy = gapPolicy;
this.formatter = formatter;
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new StatsBucketPipelineAggregator(name, bucketsPaths, gapPolicy, formatter, metaData);
}
@Override
public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories,
List<PipelineAggregatorFactory> pipelineAggregatorFactories) {
if (bucketsPaths.length != 1) {
throw new IllegalStateException(Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for aggregation [" + name + "]");
}
}
}
}

View File

@ -0,0 +1,9 @@
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended;
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats;
/**
* Extended Statistics over a set of buckets
*/
public interface ExtendedStatsBucket extends ExtendedStats {
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsBuilder;
import java.io.IOException;
public class ExtendedStatsBucketBuilder extends BucketMetricsBuilder<ExtendedStatsBucketBuilder> {
Double sigma;
public ExtendedStatsBucketBuilder(String name) {
super(name, ExtendedStatsBucketPipelineAggregator.TYPE.name());
}
public ExtendedStatsBucketBuilder sigma(Double sigma) {
this.sigma = sigma;
return this;
}
@Override
protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException {
if (sigma != null) {
builder.field(ExtendedStatsBucketParser.SIGMA.getPreferredName(), sigma);
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.text.ParseException;
import java.util.Map;
public class ExtendedStatsBucketParser extends BucketMetricsParser {
static final ParseField SIGMA = new ParseField("sigma");
@Override
public String type() {
return ExtendedStatsBucketPipelineAggregator.TYPE.name();
}
@Override
protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
ValueFormatter formatter, Map<String, Object> unparsedParams) throws ParseException {
double sigma = 2.0;
Object param = unparsedParams.get(SIGMA.getPreferredName());
if (param != null) {
if (param instanceof Double) {
sigma = (Double) param;
unparsedParams.remove(SIGMA.getPreferredName());
} else {
throw new ParseException("Parameter [" + SIGMA.getPreferredName() + "] must be a Double, type `"
+ param.getClass().getSimpleName() + "` provided instead", 0);
}
}
return new ExtendedStatsBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, sigma, gapPolicy, formatter);
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
public final static Type TYPE = new Type("extended_stats_bucket");
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public ExtendedStatsBucketPipelineAggregator readResult(StreamInput in) throws IOException {
ExtendedStatsBucketPipelineAggregator result = new ExtendedStatsBucketPipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
InternalExtendedStatsBucket.registerStreams();
}
private double sum = 0;
private long count = 0;
private double min = Double.POSITIVE_INFINITY;
private double max = Double.NEGATIVE_INFINITY;
private double sumOfSqrs = 1;
private double sigma;
protected ExtendedStatsBucketPipelineAggregator(String name, String[] bucketsPaths, double sigma, GapPolicy gapPolicy,
ValueFormatter formatter, Map<String, Object> metaData) {
super(name, bucketsPaths, gapPolicy, formatter, metaData);
this.sigma = sigma;
}
ExtendedStatsBucketPipelineAggregator() {
// For Serialization
}
@Override
public Type type() {
return TYPE;
}
@Override
protected void preCollection() {
sum = 0;
count = 0;
min = Double.POSITIVE_INFINITY;
max = Double.NEGATIVE_INFINITY;
sumOfSqrs = 1;
}
@Override
protected void collectBucketValue(String bucketKey, Double bucketValue) {
sum += bucketValue;
min = Math.min(min, bucketValue);
max = Math.max(max, bucketValue);
count += 1;
sumOfSqrs += bucketValue * bucketValue;
}
@Override
protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
return new InternalExtendedStatsBucket(name(), count, sum, min, max, sumOfSqrs, sigma, formatter, pipelineAggregators, metadata);
}
public static class Factory extends PipelineAggregatorFactory {
private final ValueFormatter formatter;
private final GapPolicy gapPolicy;
private final double sigma;
public Factory(String name, String[] bucketsPaths, double sigma, GapPolicy gapPolicy, ValueFormatter formatter) {
super(name, TYPE.name(), bucketsPaths);
this.gapPolicy = gapPolicy;
this.formatter = formatter;
this.sigma = sigma;
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new ExtendedStatsBucketPipelineAggregator(name, bucketsPaths, sigma, gapPolicy, formatter, metaData);
}
@Override
public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories,
List<PipelineAggregatorFactory> pipelineAggregatorFactories) {
if (bucketsPaths.length != 1) {
throw new IllegalStateException(Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for aggregation [" + name + "]");
}
if (sigma < 0.0 ) {
throw new IllegalStateException(ExtendedStatsBucketParser.SIGMA.getPreferredName()
+ " must be a non-negative double");
}
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class InternalExtendedStatsBucket extends InternalExtendedStats implements ExtendedStatsBucket {
public final static Type TYPE = new Type("extended_stats_bucket");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalExtendedStatsBucket readResult(StreamInput in) throws IOException {
InternalExtendedStatsBucket result = new InternalExtendedStatsBucket();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
InternalExtendedStatsBucket(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma,
ValueFormatter formatter, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, count, sum, min, max, sumOfSqrs, sigma, formatter, pipelineAggregators, metaData);
}
InternalExtendedStatsBucket() {
// for serialization
}
@Override
public Type type() {
return TYPE;
}
@Override
public org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats doReduce(
List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}
}

View File

@ -0,0 +1,478 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucket;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.extendedStatsBucket;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.IsNull.notNullValue;
@ESIntegTestCase.SuiteScopeTestCase
public class ExtendedStatsBucketIT extends ESIntegTestCase {
private static final String SINGLE_VALUED_FIELD_NAME = "l_value";
static int numDocs;
static int interval;
static int minRandomValue;
static int maxRandomValue;
static int numValueBuckets;
static long[] valueCounts;
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
numDocs = randomIntBetween(6, 20);
interval = randomIntBetween(2, 5);
minRandomValue = 0;
maxRandomValue = 20;
numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1;
valueCounts = new long[numValueBuckets];
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
int fieldValue = randomIntBetween(minRandomValue, maxRandomValue);
builders.add(client().prepareIndex("idx", "type").setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval))
.endObject()));
final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1);
valueCounts[bucket]++;
}
assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer"));
for (int i = 0; i < 2; i++) {
builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()));
}
indexRandom(true, builders);
ensureSearchable();
}
@Test
public void testDocCount_topLevel() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.addAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("histo>_count")).execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(numValueBuckets));
double sum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval));
assertThat(bucket.getDocCount(), equalTo(valueCounts[i]));
count++;
sum += bucket.getDocCount();
min = Math.min(min, bucket.getDocCount());
max = Math.max(max, bucket.getDocCount());
sumOfSquares += bucket.getDocCount() * bucket.getDocCount();
}
double avgValue = count == 0 ? Double.NaN : (sum / count);
ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket");
assertThat(extendedStatsBucketValue, notNullValue());
assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket"));
assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue));
assertThat(extendedStatsBucketValue.getMin(), equalTo(min));
assertThat(extendedStatsBucketValue.getMax(), equalTo(max));
assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares));
}
@Test
public void testDocCount_asSubAgg() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("histo>_count"))).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
double sum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
count++;
sum += bucket.getDocCount();
min = Math.min(min, bucket.getDocCount());
max = Math.max(max, bucket.getDocCount());
sumOfSquares += bucket.getDocCount() * bucket.getDocCount();
}
double avgValue = count == 0 ? Double.NaN : (sum / count);
ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("extended_stats_bucket");
assertThat(extendedStatsBucketValue, notNullValue());
assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket"));
assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue));
assertThat(extendedStatsBucketValue.getMin(), equalTo(min));
assertThat(extendedStatsBucketValue.getMax(), equalTo(max));
assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares));
}
}
@Test
public void testMetric_topLevel() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("terms>sum")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(interval));
double bucketSum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
for (int i = 0; i < interval; ++i) {
Terms.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval)));
assertThat(bucket.getDocCount(), greaterThan(0l));
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
count++;
bucketSum += sum.value();
min = Math.min(min, sum.value());
max = Math.max(max, sum.value());
sumOfSquares += sum.value() * sum.value();
}
double avgValue = count == 0 ? Double.NaN : (bucketSum / count);
ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket");
assertThat(extendedStatsBucketValue, notNullValue());
assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket"));
assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue));
assertThat(extendedStatsBucketValue.getMin(), equalTo(min));
assertThat(extendedStatsBucketValue.getMax(), equalTo(max));
assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares));
}
@Test
public void testMetric_asSubAgg() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("histo>sum"))).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
double bucketSum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
if (bucket.getDocCount() != 0) {
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
count++;
bucketSum += sum.value();
min = Math.min(min, sum.value());
max = Math.max(max, sum.value());
sumOfSquares += sum.value() * sum.value();
}
}
double avgValue = count == 0 ? Double.NaN : (bucketSum / count);
ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("extended_stats_bucket");
assertThat(extendedStatsBucketValue, notNullValue());
assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket"));
assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue));
assertThat(extendedStatsBucketValue.getMin(), equalTo(min));
assertThat(extendedStatsBucketValue.getMax(), equalTo(max));
assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares));
}
}
@Test
public void testMetric_asSubAggWithInsertZeros() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
double bucketSum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
count++;
bucketSum += sum.value();
min = Math.min(min, sum.value());
max = Math.max(max, sum.value());
sumOfSquares += sum.value() * sum.value();
}
double avgValue = count == 0 ? Double.NaN : (bucketSum / count);
ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("extended_stats_bucket");
assertThat(extendedStatsBucketValue, notNullValue());
assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket"));
assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue));
assertThat(extendedStatsBucketValue.getMin(), equalTo(min));
assertThat(extendedStatsBucketValue.getMax(), equalTo(max));
assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares));
}
}
@Test
public void testNoBuckets() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(extendedStatsBucket("extended_stats_bucket").setBucketsPaths("terms>sum")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(0));
ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket");
assertThat(extendedStatsBucketValue, notNullValue());
assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket"));
assertThat(extendedStatsBucketValue.getAvg(), equalTo(Double.NaN));
}
@Test
public void testBadSigma_asSubAgg() throws Exception {
try {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(extendedStatsBucket("extended_stats_bucket")
.setBucketsPaths("histo>sum").sigma(-1.0))).execute().actionGet();
fail("Illegal sigma was provided but no exception was thrown.");
} catch (SearchPhaseExecutionException exception) {
// All good
}
}
@Test
public void testNested() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(extendedStatsBucket("avg_histo_bucket").setBucketsPaths("histo>_count")))
.addAggregation(extendedStatsBucket("avg_terms_bucket").setBucketsPaths("terms>avg_histo_bucket.avg")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
double aggTermsSum = 0;
int aggTermsCount = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sumOfSquares = 1;
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
double aggHistoSum = 0;
int aggHistoCount = 0;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
aggHistoCount++;
aggHistoSum += bucket.getDocCount();
}
double avgHistoValue = aggHistoCount == 0 ? Double.NaN : (aggHistoSum / aggHistoCount);
ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("avg_histo_bucket");
assertThat(extendedStatsBucketValue, notNullValue());
assertThat(extendedStatsBucketValue.getName(), equalTo("avg_histo_bucket"));
assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgHistoValue));
aggTermsCount++;
aggTermsSum += avgHistoValue;
min = Math.min(min, avgHistoValue);
max = Math.max(max, avgHistoValue);
sumOfSquares += avgHistoValue * avgHistoValue;
}
double avgTermsValue = aggTermsCount == 0 ? Double.NaN : (aggTermsSum / aggTermsCount);
ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("avg_terms_bucket");
assertThat(extendedStatsBucketValue, notNullValue());
assertThat(extendedStatsBucketValue.getName(), equalTo("avg_terms_bucket"));
assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgTermsValue));
assertThat(extendedStatsBucketValue.getMin(), equalTo(min));
assertThat(extendedStatsBucketValue.getMax(), equalTo(max));
assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares));
}
}

View File

@ -0,0 +1,437 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucket;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.statsBucket;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.IsNull.notNullValue;
@ESIntegTestCase.SuiteScopeTestCase
public class StatsBucketIT extends ESIntegTestCase {
private static final String SINGLE_VALUED_FIELD_NAME = "l_value";
static int numDocs;
static int interval;
static int minRandomValue;
static int maxRandomValue;
static int numValueBuckets;
static long[] valueCounts;
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
numDocs = randomIntBetween(6, 20);
interval = randomIntBetween(2, 5);
minRandomValue = 0;
maxRandomValue = 20;
numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1;
valueCounts = new long[numValueBuckets];
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
int fieldValue = randomIntBetween(minRandomValue, maxRandomValue);
builders.add(client().prepareIndex("idx", "type").setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval))
.endObject()));
final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1);
valueCounts[bucket]++;
}
assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer"));
for (int i = 0; i < 2; i++) {
builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()));
}
indexRandom(true, builders);
ensureSearchable();
}
@Test
public void testDocCount_topLevel() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.addAggregation(statsBucket("stats_bucket").setBucketsPaths("histo>_count")).execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(numValueBuckets));
double sum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval));
assertThat(bucket.getDocCount(), equalTo(valueCounts[i]));
count++;
sum += bucket.getDocCount();
min = Math.min(min, bucket.getDocCount());
max = Math.max(max, bucket.getDocCount());
}
double avgValue = count == 0 ? Double.NaN : (sum / count);
StatsBucket statsBucketValue = response.getAggregations().get("stats_bucket");
assertThat(statsBucketValue, notNullValue());
assertThat(statsBucketValue.getName(), equalTo("stats_bucket"));
assertThat(statsBucketValue.getAvg(), equalTo(avgValue));
assertThat(statsBucketValue.getMin(), equalTo(min));
assertThat(statsBucketValue.getMax(), equalTo(max));
}
@Test
public void testDocCount_asSubAgg() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(statsBucket("stats_bucket").setBucketsPaths("histo>_count"))).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
double sum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
count++;
sum += bucket.getDocCount();
min = Math.min(min, bucket.getDocCount());
max = Math.max(max, bucket.getDocCount());
}
double avgValue = count == 0 ? Double.NaN : (sum / count);
StatsBucket statsBucketValue = termsBucket.getAggregations().get("stats_bucket");
assertThat(statsBucketValue, notNullValue());
assertThat(statsBucketValue.getName(), equalTo("stats_bucket"));
assertThat(statsBucketValue.getAvg(), equalTo(avgValue));
assertThat(statsBucketValue.getMin(), equalTo(min));
assertThat(statsBucketValue.getMax(), equalTo(max));
}
}
@Test
public void testMetric_topLevel() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(statsBucket("stats_bucket").setBucketsPaths("terms>sum")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(interval));
double bucketSum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
for (int i = 0; i < interval; ++i) {
Terms.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval)));
assertThat(bucket.getDocCount(), greaterThan(0l));
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
count++;
bucketSum += sum.value();
min = Math.min(min, sum.value());
max = Math.max(max, sum.value());
}
double avgValue = count == 0 ? Double.NaN : (bucketSum / count);
StatsBucket statsBucketValue = response.getAggregations().get("stats_bucket");
assertThat(statsBucketValue, notNullValue());
assertThat(statsBucketValue.getName(), equalTo("stats_bucket"));
assertThat(statsBucketValue.getAvg(), equalTo(avgValue));
assertThat(statsBucketValue.getMin(), equalTo(min));
assertThat(statsBucketValue.getMax(), equalTo(max));
}
@Test
public void testMetric_asSubAgg() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(statsBucket("stats_bucket").setBucketsPaths("histo>sum"))).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
double bucketSum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
if (bucket.getDocCount() != 0) {
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
count++;
bucketSum += sum.value();
min = Math.min(min, sum.value());
max = Math.max(max, sum.value());
}
}
double avgValue = count == 0 ? Double.NaN : (bucketSum / count);
StatsBucket statsBucketValue = termsBucket.getAggregations().get("stats_bucket");
assertThat(statsBucketValue, notNullValue());
assertThat(statsBucketValue.getName(), equalTo("stats_bucket"));
assertThat(statsBucketValue.getAvg(), equalTo(avgValue));
assertThat(statsBucketValue.getMin(), equalTo(min));
assertThat(statsBucketValue.getMax(), equalTo(max));
}
}
@Test
public void testMetric_asSubAggWithInsertZeros() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(statsBucket("stats_bucket").setBucketsPaths("histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
double bucketSum = 0;
int count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
count++;
bucketSum += sum.value();
min = Math.min(min, sum.value());
max = Math.max(max, sum.value());
}
double avgValue = count == 0 ? Double.NaN : (bucketSum / count);
StatsBucket statsBucketValue = termsBucket.getAggregations().get("stats_bucket");
assertThat(statsBucketValue, notNullValue());
assertThat(statsBucketValue.getName(), equalTo("stats_bucket"));
assertThat(statsBucketValue.getAvg(), equalTo(avgValue));
assertThat(statsBucketValue.getMin(), equalTo(min));
assertThat(statsBucketValue.getMax(), equalTo(max));
}
}
@Test
public void testNoBuckets() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(statsBucket("stats_bucket").setBucketsPaths("terms>sum")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(0));
StatsBucket statsBucketValue = response.getAggregations().get("stats_bucket");
assertThat(statsBucketValue, notNullValue());
assertThat(statsBucketValue.getName(), equalTo("stats_bucket"));
assertThat(statsBucketValue.getAvg(), equalTo(Double.NaN));
}
@Test
public void testNested() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(statsBucket("avg_histo_bucket").setBucketsPaths("histo>_count")))
.addAggregation(statsBucket("avg_terms_bucket").setBucketsPaths("terms>avg_histo_bucket.avg")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
double aggTermsSum = 0;
int aggTermsCount = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
double aggHistoSum = 0;
int aggHistoCount = 0;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
aggHistoCount++;
aggHistoSum += bucket.getDocCount();
}
double avgHistoValue = aggHistoCount == 0 ? Double.NaN : (aggHistoSum / aggHistoCount);
StatsBucket statsBucketValue = termsBucket.getAggregations().get("avg_histo_bucket");
assertThat(statsBucketValue, notNullValue());
assertThat(statsBucketValue.getName(), equalTo("avg_histo_bucket"));
assertThat(statsBucketValue.getAvg(), equalTo(avgHistoValue));
aggTermsCount++;
aggTermsSum += avgHistoValue;
min = Math.min(min, avgHistoValue);
max = Math.max(max, avgHistoValue);
}
double avgTermsValue = aggTermsCount == 0 ? Double.NaN : (aggTermsSum / aggTermsCount);
StatsBucket statsBucketValue = response.getAggregations().get("avg_terms_bucket");
assertThat(statsBucketValue, notNullValue());
assertThat(statsBucketValue.getName(), equalTo("avg_terms_bucket"));
assertThat(statsBucketValue.getAvg(), equalTo(avgTermsValue));
assertThat(statsBucketValue.getMin(), equalTo(min));
assertThat(statsBucketValue.getMax(), equalTo(max));
}
}

View File

@ -163,6 +163,8 @@ include::pipeline/derivative-aggregation.asciidoc[]
include::pipeline/max-bucket-aggregation.asciidoc[] include::pipeline/max-bucket-aggregation.asciidoc[]
include::pipeline/min-bucket-aggregation.asciidoc[] include::pipeline/min-bucket-aggregation.asciidoc[]
include::pipeline/sum-bucket-aggregation.asciidoc[] include::pipeline/sum-bucket-aggregation.asciidoc[]
include::pipeline/stats-bucket-aggregation.asciidoc[]
include::pipeline/extended-stats-bucket-aggregation.asciidoc[]
include::pipeline/percentiles-bucket-aggregation.asciidoc[] include::pipeline/percentiles-bucket-aggregation.asciidoc[]
include::pipeline/movavg-aggregation.asciidoc[] include::pipeline/movavg-aggregation.asciidoc[]
include::pipeline/cumulative-sum-aggregation.asciidoc[] include::pipeline/cumulative-sum-aggregation.asciidoc[]

View File

@ -0,0 +1,118 @@
[[search-aggregations-pipeline-extended-stats-bucket-aggregation]]
=== Extended Stats Bucket Aggregation
coming[2.1.0]
experimental[]
A sibling pipeline aggregation which calculates a variety of stats across all bucket of a specified metric in a sibling aggregation.
The specified metric must be numeric and the sibling aggregation must be a multi-bucket aggregation.
This aggregation provides a few more statistics (sum of squares, standard deviation, etc) compared to the `stats_bucket` aggregation.
==== Syntax
A `extended_stats_bucket` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"extended_stats_bucket": {
"buckets_path": "the_sum"
}
}
--------------------------------------------------
.`extended_stats_bucket` Parameters
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the buckets we wish to calculate stats for (see <<buckets-path-syntax>> for more
details) |Required |
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
details)|Optional | `skip`
|`format` |format to apply to the output value of this aggregation |Optional | `null`
|`sigma` |The number of standard deviations above/below the mean to display |Optional | 2
|===
The following snippet calculates the sum of all the total monthly `sales` buckets:
[source,js]
--------------------------------------------------
{
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"stats_monthly_sales": {
"extended_stats_bucket": {
"buckets_paths": "sales_per_month>sales" <1>
}
}
}
}
--------------------------------------------------
<1> `bucket_paths` instructs this `extended_stats_bucket` aggregation that we want the calculate stats for the `sales` aggregation in the
`sales_per_month` date histogram.
And the following may be the response:
[source,js]
--------------------------------------------------
{
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375
}
}
]
},
"stats_monthly_sales": {
"count": 3,
"min": 60,
"max": 550,
"avg": 328.333333333,
"sum": 985,
"sum_of_squares": 446725,
"variance": 41105.5555556,
"std_deviation": 117.054909559,
"std_deviation_bounds": {
"upper": 562.443152451,
"lower": 94.2235142151
}
}
}
}
--------------------------------------------------

View File

@ -0,0 +1,108 @@
[[search-aggregations-pipeline-stats-bucket-aggregation]]
=== Stats Bucket Aggregation
coming[2.1.0]
experimental[]
A sibling pipeline aggregation which calculates a variety of stats across all bucket of a specified metric in a sibling aggregation.
The specified metric must be numeric and the sibling aggregation must be a multi-bucket aggregation.
==== Syntax
A `stats_bucket` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"stats_bucket": {
"buckets_path": "the_sum"
}
}
--------------------------------------------------
.`stats_bucket` Parameters
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the buckets we wish to calculate stats for (see <<buckets-path-syntax>> for more
details) |Required |
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
details)|Optional | `skip`
|`format` |format to apply to the output value of this aggregation |Optional | `null`
|===
The following snippet calculates the sum of all the total monthly `sales` buckets:
[source,js]
--------------------------------------------------
{
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"stats_monthly_sales": {
"stats_bucket": {
"buckets_paths": "sales_per_month>sales" <1>
}
}
}
}
--------------------------------------------------
<1> `bucket_paths` instructs this `stats_bucket` aggregation that we want the calculate stats for the `sales` aggregation in the
`sales_per_month` date histogram.
And the following may be the response:
[source,js]
--------------------------------------------------
{
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375
}
}
]
},
"stats_monthly_sales": {
"count": 3,
"min": 60,
"max": 550,
"avg": 328.333333333,
"sum": 985
}
}
}
--------------------------------------------------