From 1d7fc6b4f2050294a4a60c55dfbe74db674ded3b Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Thu, 25 Jun 2015 14:22:15 +0100 Subject: [PATCH] Aggregations: Pipeline Aggregation to filter buckets based on a script This pipeline aggregation runs a script on each bucket in the parent aggregation to determine whether the bucket is kept in the final aggregation tree. If the script returns true the bucket is retained, if it returns false the bucket is dropped --- .../aggregations/AggregationModule.java | 2 + .../TransportAggregationModule.java | 2 + .../pipeline/PipelineAggregatorBuilders.java | 5 + .../having/BucketSelectorBuilder.java | 76 +++ .../pipeline/having/BucketSelectorParser.java | 119 +++++ .../BucketSelectorPipelineAggregator.java | 164 ++++++ .../pipeline/BucketSelectorTests.java | 468 ++++++++++++++++++ docs/reference/aggregations/pipeline.asciidoc | 1 + .../bucket-script-aggregation.asciidoc | 2 +- .../bucket-selector-aggregation.asciidoc | 107 ++++ 10 files changed, 945 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorParser.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorPipelineAggregator.java create mode 100644 core/src/test/java/org/elasticsearch/search/aggregations/pipeline/BucketSelectorTests.java create mode 100644 docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index 6efb9f72f39..f90cdbc3438 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -64,6 +64,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucke import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumParser; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParser; +import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorParser; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser; import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelModule; @@ -118,6 +119,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{ pipelineAggParsers.add(MovAvgParser.class); pipelineAggParsers.add(CumulativeSumParser.class); pipelineAggParsers.add(BucketScriptParser.class); + pipelineAggParsers.add(BucketSelectorParser.class); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index daab25eeaee..ef152ab11a2 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -69,6 +69,7 @@ import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSu import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative; +import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.movavg.models.TransportMovAvgModelModule; @@ -131,6 +132,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM MovAvgPipelineAggregator.registerStreams(); CumulativeSumPipelineAggregator.registerStreams(); BucketScriptPipelineAggregator.registerStreams(); + BucketSelectorPipelineAggregator.registerStreams(); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java index 851b750e778..a800dff3ee6 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucke import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptBuilder; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeBuilder; +import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorBuilder; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgBuilder; public final class PipelineAggregatorBuilders { @@ -61,6 +62,10 @@ public final class PipelineAggregatorBuilders { return new BucketScriptBuilder(name); } + public static final BucketSelectorBuilder having(String name) { + return new BucketSelectorBuilder(name); + } + public static final CumulativeSumBuilder cumulativeSum(String name) { return new CumulativeSumBuilder(name); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorBuilder.java new file mode 100644 index 00000000000..c291c632016 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorBuilder.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.having; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.Script.ScriptField; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder; + +import java.io.IOException; +import java.util.Map; + +public class BucketSelectorBuilder extends PipelineAggregatorBuilder { + + private GapPolicy gapPolicy; + private Script script; + private Map bucketsPathsMap; + + public BucketSelectorBuilder(String name) { + super(name, BucketSelectorPipelineAggregator.TYPE.name()); + } + + public BucketSelectorBuilder script(Script script) { + this.script = script; + return this; + } + + public BucketSelectorBuilder gapPolicy(GapPolicy gapPolicy) { + this.gapPolicy = gapPolicy; + return this; + } + + /** + * Sets the paths to the buckets to use for this pipeline aggregator. The + * map given to this method must contain script variable name as keys with + * bucket paths values to the metrics to use for each variable. + */ + public BucketSelectorBuilder setBucketsPathsMap(Map bucketsPathsMap) { + this.bucketsPathsMap = bucketsPathsMap; + return this; + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params builderParams) throws IOException { + if (script != null) { + builder.field(ScriptField.SCRIPT.getPreferredName(), script); + } + if (gapPolicy != null) { + builder.field(BucketSelectorParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); + } + if (bucketsPathsMap != null) { + builder.field(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName(), bucketsPathsMap); + } + return builder; + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorParser.java new file mode 100644 index 00000000000..e2623b52364 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorParser.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.having; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.Script.ScriptField; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class BucketSelectorParser implements PipelineAggregator.Parser { + + public static final ParseField FORMAT = new ParseField("format"); + public static final ParseField GAP_POLICY = new ParseField("gap_policy"); + public static final ParseField PARAMS_FIELD = new ParseField("params"); + + @Override + public String type() { + return BucketSelectorPipelineAggregator.TYPE.name(); + } + + @Override + public PipelineAggregatorFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException { + XContentParser.Token token; + Script script = null; + String currentFieldName = null; + Map bucketsPathsMap = null; + GapPolicy gapPolicy = GapPolicy.SKIP; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) { + bucketsPathsMap = new HashMap<>(); + bucketsPathsMap.put("_value", parser.text()); + } else if (context.parseFieldMatcher().match(currentFieldName, GAP_POLICY)) { + gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation()); + } else if (context.parseFieldMatcher().match(currentFieldName, ScriptField.SCRIPT)) { + script = Script.parse(parser, context.parseFieldMatcher()); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) { + List paths = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + String path = parser.text(); + paths.add(path); + } + bucketsPathsMap = new HashMap<>(); + for (int i = 0; i < paths.size(); i++) { + bucketsPathsMap.put("_value" + i, paths.get(i)); + } + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (context.parseFieldMatcher().match(currentFieldName, ScriptField.SCRIPT)) { + script = Script.parse(parser, context.parseFieldMatcher()); + } else if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) { + Map map = parser.map(); + bucketsPathsMap = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + bucketsPathsMap.put(entry.getKey(), String.valueOf(entry.getValue())); + } + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else { + throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].", + parser.getTokenLocation()); + } + } + + if (bucketsPathsMap == null) { + throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName() + + "] for bucket_selector aggregation [" + reducerName + "]", parser.getTokenLocation()); + } + + if (script == null) { + throw new SearchParseException(context, "Missing required field [" + ScriptField.SCRIPT.getPreferredName() + + "] for bucket_selector aggregation [" + reducerName + "]", parser.getTokenLocation()); + } + + return new BucketSelectorPipelineAggregator.Factory(reducerName, bucketsPathsMap, script, gapPolicy); + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorPipelineAggregator.java new file mode 100644 index 00000000000..154a729d046 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/having/BucketSelectorPipelineAggregator.java @@ -0,0 +1,164 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.having; + +import com.google.common.base.Function; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.script.CompiledScript; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.script.expression.ExpressionScriptEngineService; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; + +public class BucketSelectorPipelineAggregator extends PipelineAggregator { + + public final static Type TYPE = new Type("bucket_selector"); + + public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { + @Override + public BucketSelectorPipelineAggregator readResult(StreamInput in) throws IOException { + BucketSelectorPipelineAggregator result = new BucketSelectorPipelineAggregator(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); + } + + private static final Function FUNCTION = new Function() { + @Override + public InternalAggregation apply(Aggregation input) { + return (InternalAggregation) input; + } + }; + + private GapPolicy gapPolicy; + + private Script script; + + private Map bucketsPathsMap; + + public BucketSelectorPipelineAggregator() { + } + + public BucketSelectorPipelineAggregator(String name, Map bucketsPathsMap, Script script, GapPolicy gapPolicy, + Map metadata) { + super(name, bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()]), metadata); + this.bucketsPathsMap = bucketsPathsMap; + this.script = script; + this.gapPolicy = gapPolicy; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { + InternalMultiBucketAggregation originalAgg = (InternalMultiBucketAggregation) aggregation; + List buckets = originalAgg.getBuckets(); + + CompiledScript compiledScript = reduceContext.scriptService().compile(script, ScriptContext.Standard.AGGS); + List newBuckets = new ArrayList<>(); + for (Bucket bucket : buckets) { + Map vars = new HashMap<>(); + if (script.getParams() != null) { + vars.putAll(script.getParams()); + } + for (Map.Entry entry : bucketsPathsMap.entrySet()) { + String varName = entry.getKey(); + String bucketsPath = entry.getValue(); + Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy); + vars.put(varName, value); + } + ExecutableScript executableScript = reduceContext.scriptService().executable(compiledScript, vars); + Object scriptReturnValue = executableScript.run(); + final boolean keepBucket; + if (ExpressionScriptEngineService.NAME.equals(script.getLang())) { + double scriptDoubleValue = (double) scriptReturnValue; + keepBucket = scriptDoubleValue == 1.0; + } else { + keepBucket = (boolean) scriptReturnValue; + } + if (keepBucket) { + newBuckets.add(bucket); + } + } + return originalAgg.create(newBuckets); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + script.writeTo(out); + gapPolicy.writeTo(out); + out.writeGenericValue(bucketsPathsMap); + } + + @SuppressWarnings("unchecked") + @Override + protected void doReadFrom(StreamInput in) throws IOException { + script = Script.readScript(in); + gapPolicy = GapPolicy.readFrom(in); + bucketsPathsMap = (Map) in.readGenericValue(); + } + + public static class Factory extends PipelineAggregatorFactory { + + private Script script; + private GapPolicy gapPolicy; + private Map bucketsPathsMap; + + public Factory(String name, Map bucketsPathsMap, Script script, GapPolicy gapPolicy) { + super(name, TYPE.name(), bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()])); + this.bucketsPathsMap = bucketsPathsMap; + this.script = script; + this.gapPolicy = gapPolicy; + } + + @Override + protected PipelineAggregator createInternal(Map metaData) throws IOException { + return new BucketSelectorPipelineAggregator(name, bucketsPathsMap, script, gapPolicy, metaData); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/BucketSelectorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/BucketSelectorTests.java new file mode 100644 index 00000000000..bd8db68959f --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/BucketSelectorTests.java @@ -0,0 +1,468 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptService.ScriptType; +import org.elasticsearch.script.groovy.GroovyScriptEngineService; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.having; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.notNullValue; + +@ElasticsearchIntegrationTest.SuiteScopeTest +public class BucketSelectorTests extends ElasticsearchIntegrationTest { + + private static final String FIELD_1_NAME = "field1"; + private static final String FIELD_2_NAME = "field2"; + private static final String FIELD_3_NAME = "field3"; + private static final String FIELD_4_NAME = "field4"; + + private static int interval; + private static int numDocs; + private static int minNumber; + private static int maxNumber; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + + interval = randomIntBetween(1, 50); + numDocs = randomIntBetween(10, 500); + minNumber = -200; + maxNumber = 200; + + List builders = new ArrayList<>(); + for (int docs = 0; docs < numDocs; docs++) { + builders.add(client().prepareIndex("idx", "type").setSource(newDocBuilder())); + } + + client().preparePutIndexedScript().setId("my_script").setScriptLang(GroovyScriptEngineService.NAME) + .setSource("{ \"script\": \"Double.isNaN(_value0) ? false : (_value0 + _value1 > 100)\" }").get(); + + indexRandom(true, builders); + ensureSearchable(); + } + + private XContentBuilder newDocBuilder() throws IOException { + XContentBuilder jsonBuilder = jsonBuilder(); + jsonBuilder.startObject(); + jsonBuilder.field(FIELD_1_NAME, randomIntBetween(minNumber, maxNumber)); + jsonBuilder.field(FIELD_2_NAME, randomIntBetween(minNumber, maxNumber)); + jsonBuilder.field(FIELD_3_NAME, randomIntBetween(minNumber, maxNumber)); + jsonBuilder.field(FIELD_4_NAME, randomIntBetween(minNumber, maxNumber)); + jsonBuilder.endObject(); + return jsonBuilder; + } + + @Test + public void inlineScript() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum", "field3Sum").script( + new Script("Double.isNaN(_value0) ? false : (_value0 + _value1 > 100)", ScriptType.INLINE, + null, null)))).execute() + .actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + assertThat(field2SumValue + field3SumValue, greaterThan(100.0)); + } + } + + @Test + public void inlineScriptNoBucketsPruned() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum", "field3Sum").script( + new Script("Double.isNaN(_value0) ? true : (_value0 < 10000)", ScriptType.INLINE, null, + null)))).execute() + .actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + assertThat(field2SumValue + field3SumValue, lessThan(10000.0)); + } + } + + @Test + public void inlineScriptNoBucketsLeft() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum", "field3Sum").script( + new Script("Double.isNaN(_value0) ? false : (_value0 > 10000)", ScriptType.INLINE, null, + null)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + } + + @Test + public void inlineScript2() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum", "field3Sum").script( + new Script("Double.isNaN(_value0) ? false : (_value0 < _value1)", ScriptType.INLINE, null, + null)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + assertThat(field3SumValue - field2SumValue, greaterThan(0.0)); + } + } + + @Test + public void inlineScriptSingleVariable() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum") + .script(new Script("Double.isNaN(_value0) ? false : (_value0 > 100)", ScriptType.INLINE, + null, null)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + assertThat(field2SumValue, greaterThan(100.0)); + } + } + + @Test + public void inlineScriptNamedVars() { + + Map bucketPathsMap = new HashMap<>(); + bucketPathsMap.put("my_value1", "field2Sum"); + bucketPathsMap.put("my_value2", "field3Sum"); + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPathsMap(bucketPathsMap).script( + new Script("Double.isNaN(my_value1) ? false : (my_value1 + my_value2 > 100)", + ScriptType.INLINE, null, null)))).execute() + .actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + assertThat(field2SumValue + field3SumValue, greaterThan(100.0)); + } + } + + @Test + public void inlineScriptWithParams() { + + Map params = new HashMap<>(); + params.put("threshold", 100); + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum", "field3Sum").script( + new Script("Double.isNaN(_value0) ? false : (_value0 + _value1 > threshold)", + ScriptType.INLINE, null, params)))).execute() + .actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + assertThat(field2SumValue + field3SumValue, greaterThan(100.0)); + } + } + + @Test + public void inlineScriptInsertZeros() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum", "field3Sum").gapPolicy(GapPolicy.INSERT_ZEROS) + .script(new Script("_value0 + _value1 > 100", ScriptType.INLINE, null, null)))) + .execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + assertThat(field2SumValue + field3SumValue, greaterThan(100.0)); + } + } + + @Test + public void indexedScript() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum", "field3Sum").script( + new Script("my_script", ScriptType.INDEXED, null, null)))).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + assertThat(field2SumValue + field3SumValue, greaterThan(100.0)); + } + } + + @Test + public void unmapped() throws Exception { + + SearchResponse response = client() + .prepareSearch("idx_unmapped") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum", "field3Sum").script( + new Script("Double.isNaN(_value0) ? false : (_value0 + _value1 > 100)", ScriptType.INLINE, + null, null)))).execute() + .actionGet(); + + assertSearchResponse(response); + + InternalHistogram deriv = response.getAggregations().get("histo"); + assertThat(deriv, notNullValue()); + assertThat(deriv.getName(), equalTo("histo")); + assertThat(deriv.getBuckets().size(), equalTo(0)); + } + + @Test + public void partiallyUnmapped() throws Exception { + + SearchResponse response = client() + .prepareSearch("idx", "idx_unmapped") + .addAggregation( + histogram("histo") + .field(FIELD_1_NAME) + .interval(interval) + .subAggregation(sum("field2Sum").field(FIELD_2_NAME)) + .subAggregation(sum("field3Sum").field(FIELD_3_NAME)) + .subAggregation( + having("having").setBucketsPaths("field2Sum", "field3Sum").script( + new Script("Double.isNaN(_value0) ? false : (_value0 + _value1 > 100)", ScriptType.INLINE, + null, null)))).execute() + .actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + for (int i = 0; i < buckets.size(); ++i) { + Histogram.Bucket bucket = buckets.get(i); + Sum field2Sum = bucket.getAggregations().get("field2Sum"); + assertThat(field2Sum, notNullValue()); + double field2SumValue = field2Sum.getValue(); + Sum field3Sum = bucket.getAggregations().get("field3Sum"); + assertThat(field3Sum, notNullValue()); + double field3SumValue = field3Sum.getValue(); + assertThat(field2SumValue + field3SumValue, greaterThan(100.0)); + } + } +} diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index b445cbb0de1..3f3e6b88024 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -162,3 +162,4 @@ include::pipeline/sum-bucket-aggregation.asciidoc[] include::pipeline/movavg-aggregation.asciidoc[] include::pipeline/cumulative-sum-aggregation.asciidoc[] include::pipeline/bucket-script-aggregation.asciidoc[] +include::pipeline/bucket-selector-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/bucket-script-aggregation.asciidoc b/docs/reference/aggregations/pipeline/bucket-script-aggregation.asciidoc index 6e5cb6d0c2a..3618138d6ef 100644 --- a/docs/reference/aggregations/pipeline/bucket-script-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/bucket-script-aggregation.asciidoc @@ -72,7 +72,7 @@ The following snippet calculates the ratio percentage of t-shirt sales compared } }, "t-shirt-percentage": { - "series_arithmetic": { + "bucket_script": { "buckets_paths": { "tShirtSales": "t-shirts>sales", "totalSales": "total_sales" diff --git a/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc b/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc new file mode 100644 index 00000000000..ac29f21bdd9 --- /dev/null +++ b/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc @@ -0,0 +1,107 @@ +[[search-aggregations-pipeline-bucket-selector-aggregation]] +=== Bucket Selector Aggregation + +coming[2.0.0] + +experimental[] + +A parent pipeline aggregation which executes a script which determines whether the current bucket will be retained +in the parent multi-bucket aggregation. The specified metric must be numeric and the script must return a boolean value. +If the script language is `expression` then a numeric return value is permitted. In this case 0.0 will be evaluated as `false` +and all other values will evaluate to true. + +Note: The bucket_selector aggregation, like all pipeline aggregations, executions after all other sibling aggregations. This means that +using the bucket_selector aggregation to filter the returned buckets in the response does not save on execution time running the aggregations. + +==== Syntax + +A `bucket_selector` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "bucket_selector": { + "buckets_path": { + "my_var1": "the_sum", <1> + "my_var2": "the_value_count" + }, + script: "my_var1 > my_var2" + } +} +-------------------------------------------------- +<1> Here, `my_var1` is the name of the variable for this buckets path to use in the script, `the_sum` is the path to +the metrics to use for that variable. + + +.`bucket_selector` Parameters +|=== +|Parameter Name |Description |Required |Default Value +|`script` |The script to run for this aggregation. The script can be inline, file or indexed. (see <> +for more details) |Required | +|`buckets_path` |A map of script variables and their associated path to the buckets we wish to use for the variable +(see <> for more details) |Required | + |`gap_policy` |The policy to apply when gaps are found in the data (see <> for more + details)|Optional, defaults to `skip` | +|=== + +The following snippet only retains buckets where the total sales for the month is less than or equal to 50: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "interval" : "month" + }, + "aggs": { + "total_sales": { + "sum": { + "field": "price" + } + } + "sales_bucket_filter": { + "bucket_selector": { + "buckets_paths": { + "totalSales": "total_sales" + }, + "script": "totalSales <= 50" + } + } + } + } + } +} +-------------------------------------------------- + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "total_sales": { + "value": 50 + } + },<1> + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "total_sales": { + "value": 40 + }, + } + ] + } + } +} +-------------------------------------------------- +<1> Bucket for `2015/02/01 00:00:00` has been removed as its total sales exceeded 50