Aggregations: Pipeline Aggregation to filter buckets based on a script

This pipeline aggregation runs a script on each bucket in the parent aggregation to determine whether the bucket is kept in the final aggregation tree. If the script returns true the bucket is retained, if it returns false the bucket is dropped
This commit is contained in:
Colin Goodheart-Smithe 2015-06-25 14:22:15 +01:00
parent b612cab96a
commit 1d7fc6b4f2
10 changed files with 945 additions and 1 deletions

View File

@ -64,6 +64,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucke
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumParser;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParser;
import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorParser;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelModule;
@ -118,6 +119,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
pipelineAggParsers.add(MovAvgParser.class);
pipelineAggParsers.add(CumulativeSumParser.class);
pipelineAggParsers.add(BucketScriptParser.class);
pipelineAggParsers.add(BucketSelectorParser.class);
}
/**

View File

@ -69,6 +69,7 @@ import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSu
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative;
import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.TransportMovAvgModelModule;
@ -131,6 +132,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
MovAvgPipelineAggregator.registerStreams();
CumulativeSumPipelineAggregator.registerStreams();
BucketScriptPipelineAggregator.registerStreams();
BucketSelectorPipelineAggregator.registerStreams();
}
@Override

View File

@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucke
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeBuilder;
import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgBuilder;
public final class PipelineAggregatorBuilders {
@ -61,6 +62,10 @@ public final class PipelineAggregatorBuilders {
return new BucketScriptBuilder(name);
}
public static final BucketSelectorBuilder having(String name) {
return new BucketSelectorBuilder(name);
}
public static final CumulativeSumBuilder cumulativeSum(String name) {
return new CumulativeSumBuilder(name);
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.having;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.Script.ScriptField;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder;
import java.io.IOException;
import java.util.Map;
public class BucketSelectorBuilder extends PipelineAggregatorBuilder<BucketSelectorBuilder> {
private GapPolicy gapPolicy;
private Script script;
private Map<String, String> bucketsPathsMap;
public BucketSelectorBuilder(String name) {
super(name, BucketSelectorPipelineAggregator.TYPE.name());
}
public BucketSelectorBuilder script(Script script) {
this.script = script;
return this;
}
public BucketSelectorBuilder gapPolicy(GapPolicy gapPolicy) {
this.gapPolicy = gapPolicy;
return this;
}
/**
* Sets the paths to the buckets to use for this pipeline aggregator. The
* map given to this method must contain script variable name as keys with
* bucket paths values to the metrics to use for each variable.
*/
public BucketSelectorBuilder setBucketsPathsMap(Map<String, String> bucketsPathsMap) {
this.bucketsPathsMap = bucketsPathsMap;
return this;
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params builderParams) throws IOException {
if (script != null) {
builder.field(ScriptField.SCRIPT.getPreferredName(), script);
}
if (gapPolicy != null) {
builder.field(BucketSelectorParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
}
if (bucketsPathsMap != null) {
builder.field(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName(), bucketsPathsMap);
}
return builder;
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.having;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.Script.ScriptField;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BucketSelectorParser implements PipelineAggregator.Parser {
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
public static final ParseField PARAMS_FIELD = new ParseField("params");
@Override
public String type() {
return BucketSelectorPipelineAggregator.TYPE.name();
}
@Override
public PipelineAggregatorFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
XContentParser.Token token;
Script script = null;
String currentFieldName = null;
Map<String, String> bucketsPathsMap = null;
GapPolicy gapPolicy = GapPolicy.SKIP;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put("_value", parser.text());
} else if (context.parseFieldMatcher().match(currentFieldName, GAP_POLICY)) {
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
} else if (context.parseFieldMatcher().match(currentFieldName, ScriptField.SCRIPT)) {
script = Script.parse(parser, context.parseFieldMatcher());
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPathsMap = new HashMap<>();
for (int i = 0; i < paths.size(); i++) {
bucketsPathsMap.put("_value" + i, paths.get(i));
}
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (context.parseFieldMatcher().match(currentFieldName, ScriptField.SCRIPT)) {
script = Script.parse(parser, context.parseFieldMatcher());
} else if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
Map<String, Object> map = parser.map();
bucketsPathsMap = new HashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
bucketsPathsMap.put(entry.getKey(), String.valueOf(entry.getValue()));
}
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].",
parser.getTokenLocation());
}
}
if (bucketsPathsMap == null) {
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for bucket_selector aggregation [" + reducerName + "]", parser.getTokenLocation());
}
if (script == null) {
throw new SearchParseException(context, "Missing required field [" + ScriptField.SCRIPT.getPreferredName()
+ "] for bucket_selector aggregation [" + reducerName + "]", parser.getTokenLocation());
}
return new BucketSelectorPipelineAggregator.Factory(reducerName, bucketsPathsMap, script, gapPolicy);
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.having;
import com.google.common.base.Function;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.expression.ExpressionScriptEngineService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class BucketSelectorPipelineAggregator extends PipelineAggregator {
public final static Type TYPE = new Type("bucket_selector");
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public BucketSelectorPipelineAggregator readResult(StreamInput in) throws IOException {
BucketSelectorPipelineAggregator result = new BucketSelectorPipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
}
private static final Function<Aggregation, InternalAggregation> FUNCTION = new Function<Aggregation, InternalAggregation>() {
@Override
public InternalAggregation apply(Aggregation input) {
return (InternalAggregation) input;
}
};
private GapPolicy gapPolicy;
private Script script;
private Map<String, String> bucketsPathsMap;
public BucketSelectorPipelineAggregator() {
}
public BucketSelectorPipelineAggregator(String name, Map<String, String> bucketsPathsMap, Script script, GapPolicy gapPolicy,
Map<String, Object> metadata) {
super(name, bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()]), metadata);
this.bucketsPathsMap = bucketsPathsMap;
this.script = script;
this.gapPolicy = gapPolicy;
}
@Override
public Type type() {
return TYPE;
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg = (InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
List<? extends Bucket> buckets = originalAgg.getBuckets();
CompiledScript compiledScript = reduceContext.scriptService().compile(script, ScriptContext.Standard.AGGS);
List newBuckets = new ArrayList<>();
for (Bucket bucket : buckets) {
Map<String, Object> vars = new HashMap<>();
if (script.getParams() != null) {
vars.putAll(script.getParams());
}
for (Map.Entry<String, String> entry : bucketsPathsMap.entrySet()) {
String varName = entry.getKey();
String bucketsPath = entry.getValue();
Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy);
vars.put(varName, value);
}
ExecutableScript executableScript = reduceContext.scriptService().executable(compiledScript, vars);
Object scriptReturnValue = executableScript.run();
final boolean keepBucket;
if (ExpressionScriptEngineService.NAME.equals(script.getLang())) {
double scriptDoubleValue = (double) scriptReturnValue;
keepBucket = scriptDoubleValue == 1.0;
} else {
keepBucket = (boolean) scriptReturnValue;
}
if (keepBucket) {
newBuckets.add(bucket);
}
}
return originalAgg.create(newBuckets);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
script.writeTo(out);
gapPolicy.writeTo(out);
out.writeGenericValue(bucketsPathsMap);
}
@SuppressWarnings("unchecked")
@Override
protected void doReadFrom(StreamInput in) throws IOException {
script = Script.readScript(in);
gapPolicy = GapPolicy.readFrom(in);
bucketsPathsMap = (Map<String, String>) in.readGenericValue();
}
public static class Factory extends PipelineAggregatorFactory {
private Script script;
private GapPolicy gapPolicy;
private Map<String, String> bucketsPathsMap;
public Factory(String name, Map<String, String> bucketsPathsMap, Script script, GapPolicy gapPolicy) {
super(name, TYPE.name(), bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()]));
this.bucketsPathsMap = bucketsPathsMap;
this.script = script;
this.gapPolicy = gapPolicy;
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new BucketSelectorPipelineAggregator(name, bucketsPathsMap, script, gapPolicy, metaData);
}
}
}

View File

@ -0,0 +1,468 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.script.groovy.GroovyScriptEngineService;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.having;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
@ElasticsearchIntegrationTest.SuiteScopeTest
public class BucketSelectorTests extends ElasticsearchIntegrationTest {
private static final String FIELD_1_NAME = "field1";
private static final String FIELD_2_NAME = "field2";
private static final String FIELD_3_NAME = "field3";
private static final String FIELD_4_NAME = "field4";
private static int interval;
private static int numDocs;
private static int minNumber;
private static int maxNumber;
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
interval = randomIntBetween(1, 50);
numDocs = randomIntBetween(10, 500);
minNumber = -200;
maxNumber = 200;
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int docs = 0; docs < numDocs; docs++) {
builders.add(client().prepareIndex("idx", "type").setSource(newDocBuilder()));
}
client().preparePutIndexedScript().setId("my_script").setScriptLang(GroovyScriptEngineService.NAME)
.setSource("{ \"script\": \"Double.isNaN(_value0) ? false : (_value0 + _value1 > 100)\" }").get();
indexRandom(true, builders);
ensureSearchable();
}
private XContentBuilder newDocBuilder() throws IOException {
XContentBuilder jsonBuilder = jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field(FIELD_1_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_2_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_3_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_4_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.endObject();
return jsonBuilder;
}
@Test
public void inlineScript() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum", "field3Sum").script(
new Script("Double.isNaN(_value0) ? false : (_value0 + _value1 > 100)", ScriptType.INLINE,
null, null)))).execute()
.actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
assertThat(field2SumValue + field3SumValue, greaterThan(100.0));
}
}
@Test
public void inlineScriptNoBucketsPruned() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum", "field3Sum").script(
new Script("Double.isNaN(_value0) ? true : (_value0 < 10000)", ScriptType.INLINE, null,
null)))).execute()
.actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
assertThat(field2SumValue + field3SumValue, lessThan(10000.0));
}
}
@Test
public void inlineScriptNoBucketsLeft() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum", "field3Sum").script(
new Script("Double.isNaN(_value0) ? false : (_value0 > 10000)", ScriptType.INLINE, null,
null)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(0));
}
@Test
public void inlineScript2() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum", "field3Sum").script(
new Script("Double.isNaN(_value0) ? false : (_value0 < _value1)", ScriptType.INLINE, null,
null)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
assertThat(field3SumValue - field2SumValue, greaterThan(0.0));
}
}
@Test
public void inlineScriptSingleVariable() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum")
.script(new Script("Double.isNaN(_value0) ? false : (_value0 > 100)", ScriptType.INLINE,
null, null)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
assertThat(field2SumValue, greaterThan(100.0));
}
}
@Test
public void inlineScriptNamedVars() {
Map<String, String> bucketPathsMap = new HashMap<>();
bucketPathsMap.put("my_value1", "field2Sum");
bucketPathsMap.put("my_value2", "field3Sum");
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPathsMap(bucketPathsMap).script(
new Script("Double.isNaN(my_value1) ? false : (my_value1 + my_value2 > 100)",
ScriptType.INLINE, null, null)))).execute()
.actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
assertThat(field2SumValue + field3SumValue, greaterThan(100.0));
}
}
@Test
public void inlineScriptWithParams() {
Map<String, Object> params = new HashMap<>();
params.put("threshold", 100);
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum", "field3Sum").script(
new Script("Double.isNaN(_value0) ? false : (_value0 + _value1 > threshold)",
ScriptType.INLINE, null, params)))).execute()
.actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
assertThat(field2SumValue + field3SumValue, greaterThan(100.0));
}
}
@Test
public void inlineScriptInsertZeros() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum", "field3Sum").gapPolicy(GapPolicy.INSERT_ZEROS)
.script(new Script("_value0 + _value1 > 100", ScriptType.INLINE, null, null))))
.execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
assertThat(field2SumValue + field3SumValue, greaterThan(100.0));
}
}
@Test
public void indexedScript() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum", "field3Sum").script(
new Script("my_script", ScriptType.INDEXED, null, null)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
assertThat(field2SumValue + field3SumValue, greaterThan(100.0));
}
}
@Test
public void unmapped() throws Exception {
SearchResponse response = client()
.prepareSearch("idx_unmapped")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum", "field3Sum").script(
new Script("Double.isNaN(_value0) ? false : (_value0 + _value1 > 100)", ScriptType.INLINE,
null, null)))).execute()
.actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> deriv = response.getAggregations().get("histo");
assertThat(deriv, notNullValue());
assertThat(deriv.getName(), equalTo("histo"));
assertThat(deriv.getBuckets().size(), equalTo(0));
}
@Test
public void partiallyUnmapped() throws Exception {
SearchResponse response = client()
.prepareSearch("idx", "idx_unmapped")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(
having("having").setBucketsPaths("field2Sum", "field3Sum").script(
new Script("Double.isNaN(_value0) ? false : (_value0 + _value1 > 100)", ScriptType.INLINE,
null, null)))).execute()
.actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
assertThat(field2SumValue + field3SumValue, greaterThan(100.0));
}
}
}

View File

@ -162,3 +162,4 @@ include::pipeline/sum-bucket-aggregation.asciidoc[]
include::pipeline/movavg-aggregation.asciidoc[]
include::pipeline/cumulative-sum-aggregation.asciidoc[]
include::pipeline/bucket-script-aggregation.asciidoc[]
include::pipeline/bucket-selector-aggregation.asciidoc[]

View File

@ -72,7 +72,7 @@ The following snippet calculates the ratio percentage of t-shirt sales compared
}
},
"t-shirt-percentage": {
"series_arithmetic": {
"bucket_script": {
"buckets_paths": {
"tShirtSales": "t-shirts>sales",
"totalSales": "total_sales"

View File

@ -0,0 +1,107 @@
[[search-aggregations-pipeline-bucket-selector-aggregation]]
=== Bucket Selector Aggregation
coming[2.0.0]
experimental[]
A parent pipeline aggregation which executes a script which determines whether the current bucket will be retained
in the parent multi-bucket aggregation. The specified metric must be numeric and the script must return a boolean value.
If the script language is `expression` then a numeric return value is permitted. In this case 0.0 will be evaluated as `false`
and all other values will evaluate to true.
Note: The bucket_selector aggregation, like all pipeline aggregations, executions after all other sibling aggregations. This means that
using the bucket_selector aggregation to filter the returned buckets in the response does not save on execution time running the aggregations.
==== Syntax
A `bucket_selector` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"bucket_selector": {
"buckets_path": {
"my_var1": "the_sum", <1>
"my_var2": "the_value_count"
},
script: "my_var1 > my_var2"
}
}
--------------------------------------------------
<1> Here, `my_var1` is the name of the variable for this buckets path to use in the script, `the_sum` is the path to
the metrics to use for that variable.
.`bucket_selector` Parameters
|===
|Parameter Name |Description |Required |Default Value
|`script` |The script to run for this aggregation. The script can be inline, file or indexed. (see <<modules-scripting>>
for more details) |Required |
|`buckets_path` |A map of script variables and their associated path to the buckets we wish to use for the variable
(see <<bucket-path-syntax>> for more details) |Required |
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
details)|Optional, defaults to `skip` |
|===
The following snippet only retains buckets where the total sales for the month is less than or equal to 50:
[source,js]
--------------------------------------------------
{
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"total_sales": {
"sum": {
"field": "price"
}
}
"sales_bucket_filter": {
"bucket_selector": {
"buckets_paths": {
"totalSales": "total_sales"
},
"script": "totalSales <= 50"
}
}
}
}
}
}
--------------------------------------------------
And the following may be the response:
[source,js]
--------------------------------------------------
{
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"total_sales": {
"value": 50
}
},<1>
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"total_sales": {
"value": 40
},
}
]
}
}
}
--------------------------------------------------
<1> Bucket for `2015/02/01 00:00:00` has been removed as its total sales exceeded 50