From f21924ae0dd3ad5f08a85ca6947871567a0a8c6f Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Mon, 22 Jun 2015 16:30:42 +0100 Subject: [PATCH] Aggregations: Adds cumulative sum aggregation This adds a new pipeline aggregation, the cumulative sum aggregation. This is a parent aggregation which must be specified as a sub-aggregation to a histogram or date_histogram aggregation. It will add a new aggregation to each bucket containing the sum of a specified metrics over this and all previous buckets. --- .../aggregations/AggregationModule.java | 2 + .../TransportAggregationModule.java | 2 + .../pipeline/PipelineAggregatorBuilders.java | 5 + .../cumulativesum/CumulativeSumBuilder.java | 48 +++++ .../cumulativesum/CumulativeSumParser.java | 96 ++++++++++ .../CumulativeSumPipelineAggregator.java | 146 +++++++++++++++ .../pipeline/CumulativeSumTests.java | 169 ++++++++++++++++++ docs/reference/aggregations/pipeline.asciidoc | 1 + .../cumulative-sum-aggregation.asciidoc | 104 +++++++++++ 9 files changed, 573 insertions(+) create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumParser.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java create mode 100644 core/src/test/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumTests.java create mode 100644 docs/reference/aggregations/pipeline/cumulative-sum-aggregation.asciidoc diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index 93229745496..6efb9f72f39 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -61,6 +61,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucke import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketParser; +import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumParser; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParser; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser; @@ -115,6 +116,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{ pipelineAggParsers.add(AvgBucketParser.class); pipelineAggParsers.add(SumBucketParser.class); pipelineAggParsers.add(MovAvgParser.class); + pipelineAggParsers.add(CumulativeSumParser.class); pipelineAggParsers.add(BucketScriptParser.class); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index 864dd829498..daab25eeaee 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -65,6 +65,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucke import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative; @@ -128,6 +129,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM AvgBucketPipelineAggregator.registerStreams(); SumBucketPipelineAggregator.registerStreams(); MovAvgPipelineAggregator.registerStreams(); + CumulativeSumPipelineAggregator.registerStreams(); BucketScriptPipelineAggregator.registerStreams(); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java index 6a7181e82a9..12f02920c7d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucke import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder; +import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptBuilder; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeBuilder; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgBuilder; @@ -59,4 +60,8 @@ public final class PipelineAggregatorBuilders { public static final BucketScriptBuilder seriesArithmetic(String name) { return new BucketScriptBuilder(name); } + + public static final CumulativeSumBuilder cumulativeSum(String name) { + return new CumulativeSumBuilder(name); + } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumBuilder.java new file mode 100644 index 00000000000..282ded8db61 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumBuilder.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.cumulativesum; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder; + +import java.io.IOException; + +public class CumulativeSumBuilder extends PipelineAggregatorBuilder { + + private String format; + + public CumulativeSumBuilder(String name) { + super(name, CumulativeSumPipelineAggregator.TYPE.name()); + } + + public CumulativeSumBuilder format(String format) { + this.format = format; + return this; + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + if (format != null) { + builder.field(CumulativeSumParser.FORMAT.getPreferredName(), format); + } + return builder; + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumParser.java new file mode 100644 index 00000000000..7cdf7a28853 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumParser.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.cumulativesum; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.support.format.ValueFormat; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class CumulativeSumParser implements PipelineAggregator.Parser { + + public static final ParseField FORMAT = new ParseField("format"); + public static final ParseField GAP_POLICY = new ParseField("gap_policy"); + public static final ParseField UNIT = new ParseField("unit"); + + @Override + public String type() { + return CumulativeSumPipelineAggregator.TYPE.name(); + } + + @Override + public PipelineAggregatorFactory parse(String pipelineAggregatorName, XContentParser parser, SearchContext context) throws IOException { + XContentParser.Token token; + String currentFieldName = null; + String[] bucketsPaths = null; + String format = null; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if (FORMAT.match(currentFieldName)) { + format = parser.text(); + } else if (BUCKETS_PATH.match(currentFieldName)) { + bucketsPaths = new String[] { parser.text() }; + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (BUCKETS_PATH.match(currentFieldName)) { + List paths = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + String path = parser.text(); + paths.add(path); + } + bucketsPaths = paths.toArray(new String[paths.size()]); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else { + throw new SearchParseException(context, "Unexpected token " + token + " in [" + pipelineAggregatorName + "].", + parser.getTokenLocation()); + } + } + + if (bucketsPaths == null) { + throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName() + + "] for derivative aggregation [" + pipelineAggregatorName + "]", parser.getTokenLocation()); + } + + ValueFormatter formatter = null; + if (format != null) { + formatter = ValueFormat.Patternable.Number.format(format).formatter(); + } + + return new CumulativeSumPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, formatter); + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java new file mode 100644 index 00000000000..c900f353bfd --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java @@ -0,0 +1,146 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.cumulativesum; + +import com.google.common.collect.Lists; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; + +public class CumulativeSumPipelineAggregator extends PipelineAggregator { + + public final static Type TYPE = new Type("cumulative_sum"); + + public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { + @Override + public CumulativeSumPipelineAggregator readResult(StreamInput in) throws IOException { + CumulativeSumPipelineAggregator result = new CumulativeSumPipelineAggregator(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); + } + + private ValueFormatter formatter; + + public CumulativeSumPipelineAggregator() { + } + + public CumulativeSumPipelineAggregator(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, + Map metadata) { + super(name, bucketsPaths, metadata); + this.formatter = formatter; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { + InternalHistogram histo = (InternalHistogram) aggregation; + List buckets = histo.getBuckets(); + InternalHistogram.Factory factory = histo.getFactory(); + + List newBuckets = new ArrayList<>(); + double sum = 0; + for (InternalHistogram.Bucket bucket : buckets) { + Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], GapPolicy.INSERT_ZEROS); + sum += thisBucketValue; + List aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), + AGGREGATION_TRANFORM_FUNCTION)); + aggs.add(new InternalSimpleValue(name(), sum, formatter, new ArrayList(), metaData())); + InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), + new InternalAggregations(aggs), bucket.getKeyed(), bucket.getFormatter()); + newBuckets.add(newBucket); + } + return factory.create(newBuckets, histo); + } + + @Override + public void doReadFrom(StreamInput in) throws IOException { + formatter = ValueFormatterStreams.readOptional(in); + } + + @Override + public void doWriteTo(StreamOutput out) throws IOException { + ValueFormatterStreams.writeOptional(formatter, out); + } + + public static class Factory extends PipelineAggregatorFactory { + + private final ValueFormatter formatter; + + public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter) { + super(name, TYPE.name(), bucketsPaths); + this.formatter = formatter; + } + + @Override + protected PipelineAggregator createInternal(Map metaData) throws IOException { + return new CumulativeSumPipelineAggregator(name, bucketsPaths, formatter, metaData); + } + + @Override + public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, List pipelineAggregatorFactories) { + if (bucketsPaths.length != 1) { + throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() + + " must contain a single entry for aggregation [" + name + "]"); + } + if (!(parent instanceof HistogramAggregator.Factory)) { + throw new IllegalStateException("cumulative sum aggregation [" + name + + "] must have a histogram or date_histogram as parent"); + } else { + HistogramAggregator.Factory histoParent = (HistogramAggregator.Factory) parent; + if (histoParent.minDocCount() != 0) { + throw new IllegalStateException("parent histogram of cumulative sum aggregation [" + name + + "] must have min_doc_count of 0"); + } + } + } + + } +} diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumTests.java new file mode 100644 index 00000000000..509de093f2b --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumTests.java @@ -0,0 +1,169 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.cumulativeSum; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.IsNull.notNullValue; + +@ElasticsearchIntegrationTest.SuiteScopeTest +public class CumulativeSumTests extends ElasticsearchIntegrationTest { + + private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; + + static int numDocs; + static int interval; + static int minRandomValue; + static int maxRandomValue; + static int numValueBuckets; + static long[] valueCounts; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + + numDocs = randomIntBetween(6, 20); + interval = randomIntBetween(2, 5); + + minRandomValue = 0; + maxRandomValue = 20; + + numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; + valueCounts = new long[numValueBuckets]; + + List builders = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); + builders.add(client().prepareIndex("idx", "type").setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval)) + .endObject())); + final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); + valueCounts[bucket]++; + } + + assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); + for (int i = 0; i < 2; i++) { + builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject())); + } + indexRandom(true, builders); + ensureSearchable(); + } + + @Test + public void testDocCount() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(cumulativeSum("cumulative_sum").setBucketsPaths("_count"))).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + double sum = 0; + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); + assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); + sum += bucket.getDocCount(); + InternalSimpleValue cumulativeSumValue = bucket.getAggregations().get("cumulative_sum"); + assertThat(cumulativeSumValue, notNullValue()); + assertThat(cumulativeSumValue.getName(), equalTo("cumulative_sum")); + assertThat(cumulativeSumValue.value(), equalTo(sum)); + } + + } + + @Test + public void testMetric() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) + .subAggregation(cumulativeSum("cumulative_sum").setBucketsPaths("sum"))).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double bucketSum = 0; + for (int i = 0; i < buckets.size(); ++i) { + Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + bucketSum += sum.value(); + + InternalSimpleValue sumBucketValue = bucket.getAggregations().get("cumulative_sum"); + assertThat(sumBucketValue, notNullValue()); + assertThat(sumBucketValue.getName(), equalTo("cumulative_sum")); + assertThat(sumBucketValue.value(), equalTo(bucketSum)); + } + } + + @Test + public void testNoBuckets() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .setQuery(rangeQuery(SINGLE_VALUED_FIELD_NAME).lt(minRandomValue)) + .addAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) + .subAggregation(cumulativeSum("cumulative_sum").setBucketsPaths("sum"))).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + } +} diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index def73fe28d9..b445cbb0de1 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -160,4 +160,5 @@ include::pipeline/max-bucket-aggregation.asciidoc[] include::pipeline/min-bucket-aggregation.asciidoc[] include::pipeline/sum-bucket-aggregation.asciidoc[] include::pipeline/movavg-aggregation.asciidoc[] +include::pipeline/cumulative-sum-aggregation.asciidoc[] include::pipeline/bucket-script-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/cumulative-sum-aggregation.asciidoc b/docs/reference/aggregations/pipeline/cumulative-sum-aggregation.asciidoc new file mode 100644 index 00000000000..418ae88ff4f --- /dev/null +++ b/docs/reference/aggregations/pipeline/cumulative-sum-aggregation.asciidoc @@ -0,0 +1,104 @@ +[[search-aggregations-pipeline-cumulative-sum-aggregation]] +=== Cumulative Sum Aggregation + +A parent pipeline aggregation which calculates the cumulative sum of a specified metric in a parent histogram (or date_histogram) +aggregation. The specified metric must be numeric and the enclosing histogram must have `min_doc_count` set to `0` (default +for `histogram` aggregations). + +==== Syntax + +A `cumulative_sum` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "cumulative_sum": { + "buckets_path": "the_sum" + } +} +-------------------------------------------------- + +.`cumulative_sum` Parameters +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |The path to the buckets we wish to find the cumulative sum for (see <> for more + details) |Required | + |`format` |format to apply to the output value of this aggregation |Optional, defaults to `null` | +|=== + +The following snippet calculates the cumulative sum of the total monthly `sales`: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "interval" : "month" + }, + "aggs": { + "sales": { + "sum": { + "field": "price" + } + }, + "cumulative_sales": { + "cumulative_sum": { + "buckets_paths": "sales" <1> + } + } + } + } + } +} +-------------------------------------------------- + +<1> `bucket_paths` instructs this cumulative sum aggregation to use the output of the `sales` aggregation for the cumulative sum + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "sales": { + "value": 550 + }, + "cumulative_sales": { + "value": 550 + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "sales": { + "value": 60 + }, + "cumulative_sales": { + "value": 610 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "sales": { + "value": 375 + }, + "cumulative_sales": { + "value": 985 + } + } + ] + } + } +} +--------------------------------------------------