Aggregations: allow users to perform simple arithmetic operations on histogram aggregations

Closes #11029
This commit is contained in:
Colin Goodheart-Smithe 2015-05-07 15:08:36 +01:00 committed by Colin Goodheart-Smithe
parent df8a3006fc
commit a216062d88
10 changed files with 1051 additions and 1 deletions

View File

@ -64,6 +64,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucke
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParser;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelModule;
import org.elasticsearch.search.aggregations.pipeline.seriesarithmetic.SeriesArithmeticParser;
import java.util.List;
@ -114,6 +115,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
pipelineAggParsers.add(AvgBucketParser.class);
pipelineAggParsers.add(SumBucketParser.class);
pipelineAggParsers.add(MovAvgParser.class);
pipelineAggParsers.add(SeriesArithmeticParser.class);
}
/**

View File

@ -69,6 +69,7 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipel
import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.TransportMovAvgModelModule;
import org.elasticsearch.search.aggregations.pipeline.seriesarithmetic.SeriesArithmeticPipelineAggregator;
/**
* A module that registers all the transport streams for the addAggregation
@ -127,6 +128,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
AvgBucketPipelineAggregator.registerStreams();
SumBucketPipelineAggregator.registerStreams();
MovAvgPipelineAggregator.registerStreams();
SeriesArithmeticPipelineAggregator.registerStreams();
}
@Override

View File

@ -172,7 +172,8 @@ public class BucketHelpers {
value = ((InternalNumericMetricsAggregation.SingleValue) propertyValue).value();
} else {
throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName()
+ " must reference either a number value or a single value numeric metric aggregation");
+ " must reference either a number value or a single value numeric metric aggregation, got: "
+ propertyValue.getClass().getCanonicalName());
}
// doc count never has missing values so gap policy doesn't apply here
boolean isDocCountProperty = aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0));

View File

@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucke
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgBuilder;
import org.elasticsearch.search.aggregations.pipeline.seriesarithmetic.SeriesArithmeticBuilder;
public final class PipelineAggregatorBuilders {
@ -54,4 +55,8 @@ public final class PipelineAggregatorBuilders {
public static final MovAvgBuilder movingAvg(String name) {
return new MovAvgBuilder(name);
}
public static final SeriesArithmeticBuilder seriesArithmetic(String name) {
return new SeriesArithmeticBuilder(name);
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.seriesarithmetic;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.Script.ScriptField;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder;
import java.io.IOException;
import java.util.Map;
public class SeriesArithmeticBuilder extends PipelineAggregatorBuilder<SeriesArithmeticBuilder> {
private String format;
private GapPolicy gapPolicy;
private Script script;
private Map<String, String> bucketsPathsMap;
public SeriesArithmeticBuilder(String name) {
super(name, SeriesArithmeticPipelineAggregator.TYPE.name());
}
public SeriesArithmeticBuilder script(Script script) {
this.script = script;
return this;
}
public SeriesArithmeticBuilder format(String format) {
this.format = format;
return this;
}
public SeriesArithmeticBuilder gapPolicy(GapPolicy gapPolicy) {
this.gapPolicy = gapPolicy;
return this;
}
/**
* Sets the paths to the buckets to use for this pipeline aggregator
*/
public SeriesArithmeticBuilder setBucketsPathsMap(Map<String, String> bucketsPathsMap) {
this.bucketsPathsMap = bucketsPathsMap;
return this;
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params builderParams) throws IOException {
if (script != null) {
builder.field(ScriptField.SCRIPT.getPreferredName(), script);
}
if (format != null) {
builder.field(SeriesArithmeticParser.FORMAT.getPreferredName(), format);
}
if (gapPolicy != null) {
builder.field(SeriesArithmeticParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
}
if (bucketsPathsMap != null) {
builder.field(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName(), bucketsPathsMap);
}
return builder;
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.seriesarithmetic;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.Script.ScriptField;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SeriesArithmeticParser implements PipelineAggregator.Parser {
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
public static final ParseField PARAMS_FIELD = new ParseField("params");
@Override
public String type() {
return SeriesArithmeticPipelineAggregator.TYPE.name();
}
@Override
public PipelineAggregatorFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
XContentParser.Token token;
Script script = null;
String currentFieldName = null;
Map<String, String> bucketsPathsMap = null;
String format = null;
GapPolicy gapPolicy = GapPolicy.SKIP;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (FORMAT.match(currentFieldName)) {
format = parser.text();
} else if (BUCKETS_PATH.match(currentFieldName)) {
bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put("_value", parser.text());
} else if (GAP_POLICY.match(currentFieldName)) {
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
} else if (ScriptField.SCRIPT.match(currentFieldName)) {
script = Script.parse(parser);
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (BUCKETS_PATH.match(currentFieldName)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPathsMap = new HashMap<>();
for (int i = 0; i < paths.size(); i++) {
bucketsPathsMap.put("_value" + i, paths.get(i));
}
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (ScriptField.SCRIPT.match(currentFieldName)) {
script = Script.parse(parser);
} else if (BUCKETS_PATH.match(currentFieldName)) {
Map<String, Object> map = parser.map();
bucketsPathsMap = new HashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
bucketsPathsMap.put(entry.getKey(), String.valueOf(entry.getValue()));
}
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].",
parser.getTokenLocation());
}
}
if (bucketsPathsMap == null) {
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for series_arithmetic aggregation [" + reducerName + "]", parser.getTokenLocation());
}
if (script == null) {
throw new SearchParseException(context, "Missing required field [" + ScriptField.SCRIPT.getPreferredName()
+ "] for series_arithmetic aggregation [" + reducerName + "]", parser.getTokenLocation());
}
ValueFormatter formatter = null;
if (format != null) {
formatter = ValueFormat.Patternable.Number.format(format).formatter();
}
return new SeriesArithmeticPipelineAggregator.Factory(reducerName, bucketsPathsMap, script, formatter, gapPolicy);
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.seriesarithmetic;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class SeriesArithmeticPipelineAggregator extends PipelineAggregator {
public final static Type TYPE = new Type("series_arithmetic");
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public SeriesArithmeticPipelineAggregator readResult(StreamInput in) throws IOException {
SeriesArithmeticPipelineAggregator result = new SeriesArithmeticPipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
}
private static final Function<Aggregation, InternalAggregation> FUNCTION = new Function<Aggregation, InternalAggregation>() {
@Override
public InternalAggregation apply(Aggregation input) {
return (InternalAggregation) input;
}
};
private ValueFormatter formatter;
private GapPolicy gapPolicy;
private Script script;
private Map<String, String> bucketsPathsMap;
public SeriesArithmeticPipelineAggregator() {
}
public SeriesArithmeticPipelineAggregator(String name, Map<String, String> bucketsPathsMap, Script script, @Nullable ValueFormatter formatter,
GapPolicy gapPolicy, Map<String, Object> metadata) {
super(name, bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()]), metadata);
this.bucketsPathsMap = bucketsPathsMap;
this.script = script;
this.formatter = formatter;
this.gapPolicy = gapPolicy;
}
@Override
public Type type() {
return TYPE;
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg = (InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
List<? extends Bucket> buckets = originalAgg.getBuckets();
CompiledScript compiledScript = reduceContext.scriptService().compile(script, ScriptContext.Standard.AGGS);
List newBuckets = new ArrayList<>();
for (Bucket bucket : buckets) {
Map<String, Object> vars = new HashMap<>();
if (script.getParams() != null) {
vars.putAll(script.getParams());
}
for (Map.Entry<String, String> entry : bucketsPathsMap.entrySet()) {
String varName = entry.getKey();
String bucketsPath = entry.getValue();
Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy);
vars.put(varName, value);
}
ExecutableScript executableScript = reduceContext.scriptService().executable(compiledScript, vars);
Object returned = executableScript.run();
if (returned == null) {
newBuckets.add(bucket);
} else {
if (!(returned instanceof Number)) {
throw new AggregationExecutionException("series_arithmetic script for reducer [" + name() + "] must return a Number");
}
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), FUNCTION));
aggs.add(new InternalSimpleValue(name(), ((Number) returned).doubleValue(), formatter, new ArrayList<PipelineAggregator>(),
metaData()));
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs),
(InternalMultiBucketAggregation.InternalBucket) bucket);
newBuckets.add(newBucket);
}
}
return originalAgg.create(newBuckets);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
script.writeTo(out);
ValueFormatterStreams.writeOptional(formatter, out);
gapPolicy.writeTo(out);
out.writeGenericValue(bucketsPathsMap);
}
@SuppressWarnings("unchecked")
@Override
protected void doReadFrom(StreamInput in) throws IOException {
script = Script.readScript(in);
formatter = ValueFormatterStreams.readOptional(in);
gapPolicy = GapPolicy.readFrom(in);
bucketsPathsMap = (Map<String, String>) in.readGenericValue();
}
public static class Factory extends PipelineAggregatorFactory {
private Script script;
private final ValueFormatter formatter;
private GapPolicy gapPolicy;
private Map<String, String> bucketsPathsMap;
public Factory(String name, Map<String, String> bucketsPathsMap, Script script, @Nullable ValueFormatter formatter, GapPolicy gapPolicy) {
super(name, TYPE.name(), bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()]));
this.bucketsPathsMap = bucketsPathsMap;
this.script = script;
this.formatter = formatter;
this.gapPolicy = gapPolicy;
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new SeriesArithmeticPipelineAggregator(name, bucketsPathsMap, script, formatter, gapPolicy, metaData);
}
}
}

View File

@ -0,0 +1,499 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.script.groovy.GroovyScriptEngineService;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.seriesArithmetic;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
@ElasticsearchIntegrationTest.SuiteScopeTest
public class SeriesArithmeticTests extends ElasticsearchIntegrationTest {
private static final String FIELD_1_NAME = "field1";
private static final String FIELD_2_NAME = "field2";
private static final String FIELD_3_NAME = "field3";
private static final String FIELD_4_NAME = "field4";
private static int interval;
private static int numDocs;
private static int minNumber;
private static int maxNumber;
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
interval = randomIntBetween(1, 50);
numDocs = randomIntBetween(10, 500);
minNumber = -200;
maxNumber = 200;
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int docs = 0; docs < numDocs; docs++) {
builders.add(client().prepareIndex("idx", "type").setSource(newDocBuilder()));
}
client().preparePutIndexedScript().setId("my_script").setScriptLang(GroovyScriptEngineService.NAME).setSource("{ \"script\": \"_value0 + _value1 + _value2\" }").get();
indexRandom(true, builders);
ensureSearchable();
}
private XContentBuilder newDocBuilder() throws IOException {
XContentBuilder jsonBuilder = jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field(FIELD_1_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_2_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_3_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_4_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.endObject();
return jsonBuilder;
}
@Test
public void inlineScript() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(sum("field4Sum").field(FIELD_4_NAME))
.subAggregation(
seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script(
new Script("_value0 + _value1 + _value2", ScriptType.INLINE, null, null)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
if (bucket.getDocCount() == 0) {
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertTrue(Double.isNaN(seriesArithmeticValue));
} else {
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
Sum field4Sum = bucket.getAggregations().get("field4Sum");
assertThat(field4Sum, notNullValue());
double field4SumValue = field4Sum.getValue();
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue));
}
}
}
@Test
public void inlineScript2() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(sum("field4Sum").field(FIELD_4_NAME))
.subAggregation(
seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script(
new Script("_value0 + _value1 / _value2", ScriptType.INLINE, null, null)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
if (bucket.getDocCount() == 0) {
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertTrue(Double.isNaN(seriesArithmeticValue));
} else {
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
Sum field4Sum = bucket.getAggregations().get("field4Sum");
assertThat(field4Sum, notNullValue());
double field4SumValue = field4Sum.getValue();
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue / field4SumValue));
}
}
}
@Test
public void inlineScriptSingleVariable() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(
seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum").script(
new Script("_value0", ScriptType.INLINE, null, null)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
if (bucket.getDocCount() == 0) {
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertTrue(Double.isNaN(seriesArithmeticValue));
} else {
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo(field2SumValue));
}
}
}
@Test
public void inlineScriptNamedVars() {
Map<String, String> bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put("foo", "field2Sum");
bucketsPathsMap.put("bar", "field3Sum");
bucketsPathsMap.put("baz", "field4Sum");
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(sum("field4Sum").field(FIELD_4_NAME))
.subAggregation(
seriesArithmetic("seriesArithmetic").setBucketsPathsMap(bucketsPathsMap ).script(
new Script("foo + bar + baz", ScriptType.INLINE, null, null)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
if (bucket.getDocCount() == 0) {
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertTrue(Double.isNaN(seriesArithmeticValue));
} else {
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
Sum field4Sum = bucket.getAggregations().get("field4Sum");
assertThat(field4Sum, notNullValue());
double field4SumValue = field4Sum.getValue();
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue));
}
}
}
@Test
public void inlineScriptWithParams() {
Map<String, Object> params = new HashMap<>();
params.put("factor", 3);
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(sum("field4Sum").field(FIELD_4_NAME))
.subAggregation(
seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script(
new Script("(_value0 + _value1 + _value2) * factor", ScriptType.INLINE, null, params)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
if (bucket.getDocCount() == 0) {
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertTrue(Double.isNaN(seriesArithmeticValue));
} else {
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
Sum field4Sum = bucket.getAggregations().get("field4Sum");
assertThat(field4Sum, notNullValue());
double field4SumValue = field4Sum.getValue();
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo((field2SumValue + field3SumValue + field4SumValue) * 3));
}
}
}
@Test
public void inlineScriptInsertZeros() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(sum("field4Sum").field(FIELD_4_NAME))
.subAggregation(
seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script(
new Script("_value0 + _value1 + _value2", ScriptType.INLINE, null, null)).gapPolicy(GapPolicy.INSERT_ZEROS))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
if (bucket.getDocCount() == 0) {
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo(0.0));
} else {
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
Sum field4Sum = bucket.getAggregations().get("field4Sum");
assertThat(field4Sum, notNullValue());
double field4SumValue = field4Sum.getValue();
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue));
}
}
}
@Test
public void indexedScript() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(sum("field4Sum").field(FIELD_4_NAME))
.subAggregation(
seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script(
new Script("my_script", ScriptType.INDEXED, null, null)).gapPolicy(GapPolicy.INSERT_ZEROS))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
if (bucket.getDocCount() == 0) {
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo(0.0));
} else {
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
Sum field4Sum = bucket.getAggregations().get("field4Sum");
assertThat(field4Sum, notNullValue());
double field4SumValue = field4Sum.getValue();
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue));
}
}
}
@Test
public void unmapped() throws Exception {
SearchResponse response = client()
.prepareSearch("idx_unmapped")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(sum("field4Sum").field(FIELD_4_NAME))
.subAggregation(
seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script(
new Script("_value0 + _value1 + _value2", ScriptType.INLINE, null, null))))
.execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> deriv = response.getAggregations().get("histo");
assertThat(deriv, notNullValue());
assertThat(deriv.getName(), equalTo("histo"));
assertThat(deriv.getBuckets().size(), equalTo(0));
}
@Test
public void partiallyUnmapped() throws Exception {
SearchResponse response = client()
.prepareSearch("idx", "idx_unmapped")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME)
.interval(interval)
.subAggregation(sum("field2Sum").field(FIELD_2_NAME))
.subAggregation(sum("field3Sum").field(FIELD_3_NAME))
.subAggregation(sum("field4Sum").field(FIELD_4_NAME))
.subAggregation(
seriesArithmetic("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script(
new Script("_value0 + _value1 + _value2", ScriptType.INLINE, null, null)))).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
if (bucket.getDocCount() == 0) {
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertTrue(Double.isNaN(seriesArithmeticValue));
} else {
Sum field2Sum = bucket.getAggregations().get("field2Sum");
assertThat(field2Sum, notNullValue());
double field2SumValue = field2Sum.getValue();
Sum field3Sum = bucket.getAggregations().get("field3Sum");
assertThat(field3Sum, notNullValue());
double field3SumValue = field3Sum.getValue();
Sum field4Sum = bucket.getAggregations().get("field4Sum");
assertThat(field4Sum, notNullValue());
double field4SumValue = field4Sum.getValue();
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
assertThat(seriesArithmetic, notNullValue());
double seriesArithmeticValue = seriesArithmetic.value();
assertThat(seriesArithmeticValue, equalTo(field2SumValue + field3SumValue + field4SumValue));
}
}
}
}

View File

@ -160,3 +160,4 @@ include::pipeline/max-bucket-aggregation.asciidoc[]
include::pipeline/min-bucket-aggregation.asciidoc[]
include::pipeline/sum-bucket-aggregation.asciidoc[]
include::pipeline/movavg-aggregation.asciidoc[]
include::pipeline/series-arithmetic-aggregation.asciidoc[]

View File

@ -0,0 +1,149 @@
[[search-aggregations-pipeline-series-arithmetic-aggregation]]
=== Series Arithmetic Aggregation
A parent pipeline aggregation which executes a script which can perform per bucket computations on specified metrics
in the parent multi-bucket aggregation. The specified metric must be numeric and the script must return a numeric value.
==== Syntax
A `series_arithmetic` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"series_arithmetic": {
"buckets_path": {
"my_var1": "the_sum", <1>
"my_var2": "the_value_count"
},
script: "my_var1 / my_var2"
}
}
--------------------------------------------------
<1> Here, `my_var1` is the name of the variable for this buckets path to use in the script, `the_sum` is the path to
the metrics to use for that variable.
.`series_arithmetic` Parameters
|===
|Parameter Name |Description |Required |Default Value
|`script` |The script to run for this aggregation. The script can be inline, file or indexed. (see <<modules-scripting>>
for more details) |Required |
|`buckets_path` |A map of script variables and their associated path to the buckets we wish to use for the variable
(see <<bucket-path-syntax>> for more details) |Required |
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
details)|Optional, defaults to `skip` |
|`format` |format to apply to the output value of this aggregation |Optional, defaults to `null` |
|===
The following snippet calculates the ratio percentage of t-shirt sales compared to total sales each month:
[source,js]
--------------------------------------------------
{
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"total_sales": {
"sum": {
"field": "price"
}
},
"t-shirts": {
"filter": {
"term": {
"type": "t-shirt"
}
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"t-shirt-percentage": {
"series_arithmetic": {
"buckets_paths": {
"tShirtSales": "t-shirts>sales",
"totalSales": "total_sales"
},
"script": "tShirtSales / totalSales * 100"
}
}
}
}
}
}
--------------------------------------------------
And the following may be the response:
[source,js]
--------------------------------------------------
{
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"total_sales": {
"value": 50
},
"t-shirts": {
"doc_count": 2,
"sales": {
"value": 10
}
},
"t-shirt-percentage": {
"value": 20
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2
"total_sales": {
"value": 60
},
"t-shirts": {
"doc_count": 1,
"sales": {
"value": 15
}
},
"t-shirt-percentage": {
"value": 25
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"total_sales": {
"value": 40
},
"t-shirts": {
"doc_count": 1,
"sales": {
"value": 20
}
},
"t-shirt-percentage": {
"value": 50
}
}
]
}
}
}
--------------------------------------------------