diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index fd6eabd15f5..8e86c8b5e37 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -64,6 +64,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucke import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParser; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser; import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelModule; +import org.elasticsearch.search.aggregations.pipeline.seriesarithmetic.SeriesArithmeticParser; import java.util.List; @@ -114,6 +115,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{ pipelineAggParsers.add(AvgBucketParser.class); pipelineAggParsers.add(SumBucketParser.class); pipelineAggParsers.add(MovAvgParser.class); + pipelineAggParsers.add(SeriesArithmeticParser.class); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index 358abcdfda4..3a9f11f85ea 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -69,6 +69,7 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipel import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.movavg.models.TransportMovAvgModelModule; +import org.elasticsearch.search.aggregations.pipeline.seriesarithmetic.SeriesArithmeticPipelineAggregator; /** * A module that registers all the transport streams for the addAggregation @@ -127,6 +128,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM AvgBucketPipelineAggregator.registerStreams(); SumBucketPipelineAggregator.registerStreams(); MovAvgPipelineAggregator.registerStreams(); + SeriesArithmeticPipelineAggregator.registerStreams(); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java index b96bf4af176..16a1dc5677c 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java @@ -172,7 +172,8 @@ public class BucketHelpers { value = ((InternalNumericMetricsAggregation.SingleValue) propertyValue).value(); } else { throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName() - + " must reference either a number value or a single value numeric metric aggregation"); + + " must reference either a number value or a single value numeric metric aggregation, got: " + + propertyValue.getClass().getCanonicalName()); } // doc count never has missing values so gap policy doesn't apply here boolean isDocCountProperty = aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0)); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java index 0d3bfe1f3fd..18adc122cba 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java @@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucke import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeBuilder; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgBuilder; +import org.elasticsearch.search.aggregations.pipeline.seriesarithmetic.SeriesArithmeticBuilder; public final class PipelineAggregatorBuilders { @@ -54,4 +55,8 @@ public final class PipelineAggregatorBuilders { public static final MovAvgBuilder movingAvg(String name) { return new MovAvgBuilder(name); } + + public static final SeriesArithmeticBuilder seriesArithmetic(String name) { + return new SeriesArithmeticBuilder(name); + } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/seriesarithmetic/SeriesArithmeticBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/seriesarithmetic/SeriesArithmeticBuilder.java new file mode 100644 index 00000000000..7171b7c2b35 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/seriesarithmetic/SeriesArithmeticBuilder.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.seriesarithmetic; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.Script.ScriptField; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder; + +import java.io.IOException; +import java.util.Map; + +public class SeriesArithmeticBuilder extends PipelineAggregatorBuilder { + + private String format; + private GapPolicy gapPolicy; + private Script script; + private Map bucketsPathsMap; + + public SeriesArithmeticBuilder(String name) { + super(name, SeriesArithmeticPipelineAggregator.TYPE.name()); + } + + public SeriesArithmeticBuilder script(Script script) { + this.script = script; + return this; + } + + public SeriesArithmeticBuilder format(String format) { + this.format = format; + return this; + } + + public SeriesArithmeticBuilder gapPolicy(GapPolicy gapPolicy) { + this.gapPolicy = gapPolicy; + return this; + } + + /** + * Sets the paths to the buckets to use for this pipeline aggregator + */ + public SeriesArithmeticBuilder setBucketsPathsMap(Map bucketsPathsMap) { + this.bucketsPathsMap = bucketsPathsMap; + return this; + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params builderParams) throws IOException { + if (script != null) { + builder.field(ScriptField.SCRIPT.getPreferredName(), script); + } + if (format != null) { + builder.field(SeriesArithmeticParser.FORMAT.getPreferredName(), format); + } + if (gapPolicy != null) { + builder.field(SeriesArithmeticParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); + } + if (bucketsPathsMap != null) { + builder.field(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName(), bucketsPathsMap); + } + return builder; + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/seriesarithmetic/SeriesArithmeticParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/seriesarithmetic/SeriesArithmeticParser.java new file mode 100644 index 00000000000..5638d5e165b --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/seriesarithmetic/SeriesArithmeticParser.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.seriesarithmetic; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.Script.ScriptField; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.support.format.ValueFormat; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SeriesArithmeticParser implements PipelineAggregator.Parser { + + public static final ParseField FORMAT = new ParseField("format"); + public static final ParseField GAP_POLICY = new ParseField("gap_policy"); + public static final ParseField PARAMS_FIELD = new ParseField("params"); + + @Override + public String type() { + return SeriesArithmeticPipelineAggregator.TYPE.name(); + } + + @Override + public PipelineAggregatorFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException { + XContentParser.Token token; + Script script = null; + String currentFieldName = null; + Map bucketsPathsMap = null; + String format = null; + GapPolicy gapPolicy = GapPolicy.SKIP; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if (FORMAT.match(currentFieldName)) { + format = parser.text(); + } else if (BUCKETS_PATH.match(currentFieldName)) { + bucketsPathsMap = new HashMap<>(); + bucketsPathsMap.put("_value", parser.text()); + } else if (GAP_POLICY.match(currentFieldName)) { + gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation()); + } else if (ScriptField.SCRIPT.match(currentFieldName)) { + script = Script.parse(parser); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (BUCKETS_PATH.match(currentFieldName)) { + List paths = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + String path = parser.text(); + paths.add(path); + } + bucketsPathsMap = new HashMap<>(); + for (int i = 0; i < paths.size(); i++) { + bucketsPathsMap.put("_value" + i, paths.get(i)); + } + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (ScriptField.SCRIPT.match(currentFieldName)) { + script = Script.parse(parser); + } else if (BUCKETS_PATH.match(currentFieldName)) { + Map map = parser.map(); + bucketsPathsMap = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + bucketsPathsMap.put(entry.getKey(), String.valueOf(entry.getValue())); + } + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else { + throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].", + parser.getTokenLocation()); + } + } + + if (bucketsPathsMap == null) { + throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName() + + "] for series_arithmetic aggregation [" + reducerName + "]", parser.getTokenLocation()); + } + + if (script == null) { + throw new SearchParseException(context, "Missing required field [" + ScriptField.SCRIPT.getPreferredName() + + "] for series_arithmetic aggregation [" + reducerName + "]", parser.getTokenLocation()); + } + + ValueFormatter formatter = null; + if (format != null) { + formatter = ValueFormat.Patternable.Number.format(format).formatter(); + } + + return new SeriesArithmeticPipelineAggregator.Factory(reducerName, bucketsPathsMap, script, formatter, gapPolicy); + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/seriesarithmetic/SeriesArithmeticPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/seriesarithmetic/SeriesArithmeticPipelineAggregator.java new file mode 100644 index 00000000000..b2855204ce1 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/seriesarithmetic/SeriesArithmeticPipelineAggregator.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.seriesarithmetic; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.script.CompiledScript; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; + +public class SeriesArithmeticPipelineAggregator extends PipelineAggregator { + + public final static Type TYPE = new Type("series_arithmetic"); + + public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { + @Override + public SeriesArithmeticPipelineAggregator readResult(StreamInput in) throws IOException { + SeriesArithmeticPipelineAggregator result = new SeriesArithmeticPipelineAggregator(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); + } + + private static final Function FUNCTION = new Function() { + @Override + public InternalAggregation apply(Aggregation input) { + return (InternalAggregation) input; + } + }; + + private ValueFormatter formatter; + private GapPolicy gapPolicy; + + private Script script; + + private Map bucketsPathsMap; + + public SeriesArithmeticPipelineAggregator() { + } + + public SeriesArithmeticPipelineAggregator(String name, Map bucketsPathsMap, Script script, @Nullable ValueFormatter formatter, + GapPolicy gapPolicy, Map metadata) { + super(name, bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()]), metadata); + this.bucketsPathsMap = bucketsPathsMap; + this.script = script; + this.formatter = formatter; + this.gapPolicy = gapPolicy; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { + InternalMultiBucketAggregation originalAgg = (InternalMultiBucketAggregation) aggregation; + List buckets = originalAgg.getBuckets(); + + CompiledScript compiledScript = reduceContext.scriptService().compile(script, ScriptContext.Standard.AGGS); + List newBuckets = new ArrayList<>(); + for (Bucket bucket : buckets) { + Map vars = new HashMap<>(); + if (script.getParams() != null) { + vars.putAll(script.getParams()); + } + for (Map.Entry entry : bucketsPathsMap.entrySet()) { + String varName = entry.getKey(); + String bucketsPath = entry.getValue(); + Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy); + vars.put(varName, value); + } + ExecutableScript executableScript = reduceContext.scriptService().executable(compiledScript, vars); + Object returned = executableScript.run(); + if (returned == null) { + newBuckets.add(bucket); + } else { + if (!(returned instanceof Number)) { + throw new AggregationExecutionException("series_arithmetic script for reducer [" + name() + "] must return a Number"); + } + List aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), FUNCTION)); + aggs.add(new InternalSimpleValue(name(), ((Number) returned).doubleValue(), formatter, new ArrayList(), + metaData())); + InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs), + (InternalMultiBucketAggregation.InternalBucket) bucket); + newBuckets.add(newBucket); + } + } + return originalAgg.create(newBuckets); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + script.writeTo(out); + ValueFormatterStreams.writeOptional(formatter, out); + gapPolicy.writeTo(out); + out.writeGenericValue(bucketsPathsMap); + } + + @SuppressWarnings("unchecked") + @Override + protected void doReadFrom(StreamInput in) throws IOException { + script = Script.readScript(in); + formatter = ValueFormatterStreams.readOptional(in); + gapPolicy = GapPolicy.readFrom(in); + bucketsPathsMap = (Map) in.readGenericValue(); + } + + public static class Factory extends PipelineAggregatorFactory { + + private Script script; + private final ValueFormatter formatter; + private GapPolicy gapPolicy; + private Map bucketsPathsMap; + + public Factory(String name, Map bucketsPathsMap, Script script, @Nullable ValueFormatter formatter, GapPolicy gapPolicy) { + super(name, TYPE.name(), bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()])); + this.bucketsPathsMap = bucketsPathsMap; + this.script = script; + this.formatter = formatter; + this.gapPolicy = gapPolicy; + } + + @Override + protected PipelineAggregator createInternal(Map metaData) throws IOException { + return new SeriesArithmeticPipelineAggregator(name, bucketsPathsMap, script, formatter, gapPolicy, metaData); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/SeriesArithmeticTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/SeriesArithmeticTests.java new file mode 100644 index 00000000000..2fbbb4d717a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/SeriesArithmeticTests.java @@ -0,0 +1,499 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptService.ScriptType; +import org.elasticsearch.script.groovy.GroovyScriptEngineService; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.seriesArithmetic; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +@ElasticsearchIntegrationTest.SuiteScopeTest +public class SeriesArithmeticTests extends ElasticsearchIntegrationTest { + + private static final String FIELD_1_NAME = "field1"; + private static final String FIELD_2_NAME = "field2"; + private static final String FIELD_3_NAME = "field3"; + private static final String FIELD_4_NAME = "field4"; + + private static int interval; + private static int numDocs; + private static int minNumber; + private static int maxNumber; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + + interval = randomIntBetween(1, 50); + numDocs = randomIntBetween(10, 500); + minNumber = -200; + maxNumber = 200; + + List builders = new ArrayList<>(); + for (int docs = 0; docs < numDocs; docs++) { + builders.add(client().prepareIndex("idx", "type").setSource(newDocBuilder())); + } + + client().preparePutIndexedScript().setId("my_script").setScriptLang(GroovyScriptEngineService.NAME).setSource("{ \"script\": \"_value0 + _value1 + _value2\" }").get(); + + indexRandom(true, builders); + ensureSearchable(); + } + + private XContentBuilder newDocBuilder() throws IOException { + XContentBuilder jsonBuilder = jsonBuilder(); + jsonBuilder.startObject(); + jsonBuilder.field(FIELD_1_NAME, randomIntBetween(minNumber, maxNumber)); + jsonBuilder.field(FIELD_2_NAME, randomIntBetween(minNumber, maxNumber)); + jsonBuilder.field(FIELD_3_NAME, randomIntBetween(minNumber, maxNumber)); + jsonBuilder.field(FIELD_4_NAME, randomIntBetween(minNumber, maxNumber)); + jsonBuilder.endObject(); + return jsonBuilder; + } + + @Test + public void inlineScript() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation(sum("field4Sum").field(FIELD_4_NAME)) + .subAggregation( + seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script( + new Script("_value0 + _value1 + _value2", ScriptType.INLINE, null, null)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + if (bucket.getDocCount() == 0) { + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertTrue(Double.isNaN(seriesArithmeticValue)); + } else { + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + Sum field4Sum = bucket.getAggregations().get("field4Sum"); + assertThat(field4Sum, notNullValue()); + double field4SumValue = field4Sum.getValue(); + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue)); + } + } + } + + @Test + public void inlineScript2() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation(sum("field4Sum").field(FIELD_4_NAME)) + .subAggregation( + seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script( + new Script("_value0 + _value1 / _value2", ScriptType.INLINE, null, null)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + if (bucket.getDocCount() == 0) { + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertTrue(Double.isNaN(seriesArithmeticValue)); + } else { + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + Sum field4Sum = bucket.getAggregations().get("field4Sum"); + assertThat(field4Sum, notNullValue()); + double field4SumValue = field4Sum.getValue(); + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue / field4SumValue)); + } + } + } + + @Test + public void inlineScriptSingleVariable() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation( + seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum").script( + new Script("_value0", ScriptType.INLINE, null, null)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + if (bucket.getDocCount() == 0) { + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertTrue(Double.isNaN(seriesArithmeticValue)); + } else { + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo(field2SumValue)); + } + } + } + + @Test + public void inlineScriptNamedVars() { + + Map bucketsPathsMap = new HashMap<>(); + bucketsPathsMap.put("foo", "field2Sum"); + bucketsPathsMap.put("bar", "field3Sum"); + bucketsPathsMap.put("baz", "field4Sum"); + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation(sum("field4Sum").field(FIELD_4_NAME)) + .subAggregation( + seriesArithmetic("seriesArithmetic").setBucketsPathsMap(bucketsPathsMap ).script( + new Script("foo + bar + baz", ScriptType.INLINE, null, null)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + if (bucket.getDocCount() == 0) { + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertTrue(Double.isNaN(seriesArithmeticValue)); + } else { + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + Sum field4Sum = bucket.getAggregations().get("field4Sum"); + assertThat(field4Sum, notNullValue()); + double field4SumValue = field4Sum.getValue(); + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue)); + } + } + } + + @Test + public void inlineScriptWithParams() { + + Map params = new HashMap<>(); + params.put("factor", 3); + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation(sum("field4Sum").field(FIELD_4_NAME)) + .subAggregation( + seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script( + new Script("(_value0 + _value1 + _value2) * factor", ScriptType.INLINE, null, params)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + if (bucket.getDocCount() == 0) { + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertTrue(Double.isNaN(seriesArithmeticValue)); + } else { + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + Sum field4Sum = bucket.getAggregations().get("field4Sum"); + assertThat(field4Sum, notNullValue()); + double field4SumValue = field4Sum.getValue(); + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo((field2SumValue + field3SumValue + field4SumValue) * 3)); + } + } + } + + @Test + public void inlineScriptInsertZeros() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation(sum("field4Sum").field(FIELD_4_NAME)) + .subAggregation( + seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script( + new Script("_value0 + _value1 + _value2", ScriptType.INLINE, null, null)).gapPolicy(GapPolicy.INSERT_ZEROS))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + if (bucket.getDocCount() == 0) { + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo(0.0)); + } else { + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + Sum field4Sum = bucket.getAggregations().get("field4Sum"); + assertThat(field4Sum, notNullValue()); + double field4SumValue = field4Sum.getValue(); + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue)); + } + } + } + + @Test + public void indexedScript() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation(sum("field4Sum").field(FIELD_4_NAME)) + .subAggregation( + seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script( + new Script("my_script", ScriptType.INDEXED, null, null)).gapPolicy(GapPolicy.INSERT_ZEROS))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + if (bucket.getDocCount() == 0) { + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo(0.0)); + } else { + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + Sum field4Sum = bucket.getAggregations().get("field4Sum"); + assertThat(field4Sum, notNullValue()); + double field4SumValue = field4Sum.getValue(); + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue)); + } + } + } + + @Test + public void unmapped() throws Exception { + SearchResponse response = client() + .prepareSearch("idx_unmapped") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation(sum("field4Sum").field(FIELD_4_NAME)) + .subAggregation( + seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script( + new Script("_value0 + _value1 + _value2", ScriptType.INLINE, null, null)))) + .execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram deriv = response.getAggregations().get("histo"); + assertThat(deriv, notNullValue()); + assertThat(deriv.getName(), equalTo("histo")); + assertThat(deriv.getBuckets().size(), equalTo(0)); + } + + @Test + public void partiallyUnmapped() throws Exception { + SearchResponse response = client() + .prepareSearch("idx", "idx_unmapped") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation(sum("field4Sum").field(FIELD_4_NAME)) + .subAggregation( + seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script( + new Script("_value0 + _value1 + _value2", ScriptType.INLINE, null, null)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + if (bucket.getDocCount() == 0) { + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertTrue(Double.isNaN(seriesArithmeticValue)); + } else { + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + Sum field4Sum = bucket.getAggregations().get("field4Sum"); + assertThat(field4Sum, notNullValue()); + double field4SumValue = field4Sum.getValue(); + SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic"); + assertThat(seriesArithmetic, notNullValue()); + double seriesArithmeticValue = seriesArithmetic.value(); + assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue)); + } + } + } +} diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index c991086532e..dbf63180144 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -160,3 +160,4 @@ include::pipeline/max-bucket-aggregation.asciidoc[] include::pipeline/min-bucket-aggregation.asciidoc[] include::pipeline/sum-bucket-aggregation.asciidoc[] include::pipeline/movavg-aggregation.asciidoc[] +include::pipeline/series-arithmetic-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/series-arithmetic-aggregation.asciidoc b/docs/reference/aggregations/pipeline/series-arithmetic-aggregation.asciidoc new file mode 100644 index 00000000000..dc39ab326d2 --- /dev/null +++ b/docs/reference/aggregations/pipeline/series-arithmetic-aggregation.asciidoc @@ -0,0 +1,149 @@ +[[search-aggregations-pipeline-series-arithmetic-aggregation]] +=== Series Arithmetic Aggregation + +A parent pipeline aggregation which executes a script which can perform per bucket computations on specified metrics +in the parent multi-bucket aggregation. The specified metric must be numeric and the script must return a numeric value. + +==== Syntax + +A `series_arithmetic` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "series_arithmetic": { + "buckets_path": { + "my_var1": "the_sum", <1> + "my_var2": "the_value_count" + }, + script: "my_var1 / my_var2" + } +} +-------------------------------------------------- +<1> Here, `my_var1` is the name of the variable for this buckets path to use in the script, `the_sum` is the path to +the metrics to use for that variable. + + +.`series_arithmetic` Parameters +|=== +|Parameter Name |Description |Required |Default Value +|`script` |The script to run for this aggregation. The script can be inline, file or indexed. (see <> +for more details) |Required | +|`buckets_path` |A map of script variables and their associated path to the buckets we wish to use for the variable +(see <> for more details) |Required | + |`gap_policy` |The policy to apply when gaps are found in the data (see <> for more + details)|Optional, defaults to `skip` | + |`format` |format to apply to the output value of this aggregation |Optional, defaults to `null` | +|=== + +The following snippet calculates the ratio percentage of t-shirt sales compared to total sales each month: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "interval" : "month" + }, + "aggs": { + "total_sales": { + "sum": { + "field": "price" + } + }, + "t-shirts": { + "filter": { + "term": { + "type": "t-shirt" + } + }, + "aggs": { + "sales": { + "sum": { + "field": "price" + } + } + } + }, + "t-shirt-percentage": { + "series_arithmetic": { + "buckets_paths": { + "tShirtSales": "t-shirts>sales", + "totalSales": "total_sales" + }, + "script": "tShirtSales / totalSales * 100" + } + } + } + } + } +} +-------------------------------------------------- + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "total_sales": { + "value": 50 + }, + "t-shirts": { + "doc_count": 2, + "sales": { + "value": 10 + } + }, + "t-shirt-percentage": { + "value": 20 + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2 + "total_sales": { + "value": 60 + }, + "t-shirts": { + "doc_count": 1, + "sales": { + "value": 15 + } + }, + "t-shirt-percentage": { + "value": 25 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "total_sales": { + "value": 40 + }, + "t-shirts": { + "doc_count": 1, + "sales": { + "value": 20 + } + }, + "t-shirt-percentage": { + "value": 50 + } + } + ] + } + } +} +-------------------------------------------------- +