Aggregations: Makes SKIP Gap Policy work correctly for Bucket Script aggregation
This change means that when the skip gap policy is used, the bucket script aggregation will skip executing the script on a bucket if any of the required bucket_paths are missing for the bucket. No aggregation will be added to the bucket, and the aggregation will move to the next bucket.
This commit is contained in:
parent
737440b580
commit
ab80130c10
|
@ -112,28 +112,38 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
|
|||
if (script.getParams() != null) {
|
||||
vars.putAll(script.getParams());
|
||||
}
|
||||
boolean skipBucket = false;
|
||||
for (Map.Entry<String, String> entry : bucketsPathsMap.entrySet()) {
|
||||
String varName = entry.getKey();
|
||||
String bucketsPath = entry.getValue();
|
||||
Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy);
|
||||
if (GapPolicy.SKIP == gapPolicy && (value == null || Double.isNaN(value))) {
|
||||
skipBucket = true;
|
||||
break;
|
||||
}
|
||||
vars.put(varName, value);
|
||||
}
|
||||
if (skipBucket) {
|
||||
newBuckets.add(bucket);
|
||||
} else {
|
||||
ExecutableScript executableScript = reduceContext.scriptService().executable(compiledScript, vars);
|
||||
Object returned = executableScript.run();
|
||||
if (returned == null) {
|
||||
newBuckets.add(bucket);
|
||||
} else {
|
||||
if (!(returned instanceof Number)) {
|
||||
throw new AggregationExecutionException("series_arithmetic script for reducer [" + name() + "] must return a Number");
|
||||
throw new AggregationExecutionException("series_arithmetic script for reducer [" + name()
|
||||
+ "] must return a Number");
|
||||
}
|
||||
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), FUNCTION));
|
||||
aggs.add(new InternalSimpleValue(name(), ((Number) returned).doubleValue(), formatter, new ArrayList<PipelineAggregator>(),
|
||||
metaData()));
|
||||
aggs.add(new InternalSimpleValue(name(), ((Number) returned).doubleValue(), formatter,
|
||||
new ArrayList<PipelineAggregator>(), metaData()));
|
||||
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs),
|
||||
(InternalMultiBucketAggregation.InternalBucket) bucket);
|
||||
newBuckets.add(newBucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
return originalAgg.create(newBuckets);
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorB
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@ElasticsearchIntegrationTest.SuiteScopeTest
|
||||
public class BucketScriptTests extends ElasticsearchIntegrationTest {
|
||||
|
@ -119,9 +120,7 @@ public class BucketScriptTests extends ElasticsearchIntegrationTest {
|
|||
Histogram.Bucket bucket = buckets.get(i);
|
||||
if (bucket.getDocCount() == 0) {
|
||||
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
|
||||
assertThat(seriesArithmetic, notNullValue());
|
||||
double seriesArithmeticValue = seriesArithmetic.value();
|
||||
assertTrue(Double.isNaN(seriesArithmeticValue));
|
||||
assertThat(seriesArithmetic, nullValue());
|
||||
} else {
|
||||
Sum field2Sum = bucket.getAggregations().get("field2Sum");
|
||||
assertThat(field2Sum, notNullValue());
|
||||
|
@ -167,9 +166,7 @@ public class BucketScriptTests extends ElasticsearchIntegrationTest {
|
|||
Histogram.Bucket bucket = buckets.get(i);
|
||||
if (bucket.getDocCount() == 0) {
|
||||
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
|
||||
assertThat(seriesArithmetic, notNullValue());
|
||||
double seriesArithmeticValue = seriesArithmetic.value();
|
||||
assertTrue(Double.isNaN(seriesArithmeticValue));
|
||||
assertThat(seriesArithmetic, nullValue());
|
||||
} else {
|
||||
Sum field2Sum = bucket.getAggregations().get("field2Sum");
|
||||
assertThat(field2Sum, notNullValue());
|
||||
|
@ -213,9 +210,7 @@ public class BucketScriptTests extends ElasticsearchIntegrationTest {
|
|||
Histogram.Bucket bucket = buckets.get(i);
|
||||
if (bucket.getDocCount() == 0) {
|
||||
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
|
||||
assertThat(seriesArithmetic, notNullValue());
|
||||
double seriesArithmeticValue = seriesArithmetic.value();
|
||||
assertTrue(Double.isNaN(seriesArithmeticValue));
|
||||
assertThat(seriesArithmetic, nullValue());
|
||||
} else {
|
||||
Sum field2Sum = bucket.getAggregations().get("field2Sum");
|
||||
assertThat(field2Sum, notNullValue());
|
||||
|
@ -259,9 +254,7 @@ public class BucketScriptTests extends ElasticsearchIntegrationTest {
|
|||
Histogram.Bucket bucket = buckets.get(i);
|
||||
if (bucket.getDocCount() == 0) {
|
||||
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
|
||||
assertThat(seriesArithmetic, notNullValue());
|
||||
double seriesArithmeticValue = seriesArithmetic.value();
|
||||
assertTrue(Double.isNaN(seriesArithmeticValue));
|
||||
assertThat(seriesArithmetic, nullValue());
|
||||
} else {
|
||||
Sum field2Sum = bucket.getAggregations().get("field2Sum");
|
||||
assertThat(field2Sum, notNullValue());
|
||||
|
@ -309,9 +302,7 @@ public class BucketScriptTests extends ElasticsearchIntegrationTest {
|
|||
Histogram.Bucket bucket = buckets.get(i);
|
||||
if (bucket.getDocCount() == 0) {
|
||||
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
|
||||
assertThat(seriesArithmetic, notNullValue());
|
||||
double seriesArithmeticValue = seriesArithmetic.value();
|
||||
assertTrue(Double.isNaN(seriesArithmeticValue));
|
||||
assertThat(seriesArithmetic, nullValue());
|
||||
} else {
|
||||
Sum field2Sum = bucket.getAggregations().get("field2Sum");
|
||||
assertThat(field2Sum, notNullValue());
|
||||
|
@ -392,7 +383,7 @@ public class BucketScriptTests extends ElasticsearchIntegrationTest {
|
|||
.subAggregation(sum("field4Sum").field(FIELD_4_NAME))
|
||||
.subAggregation(
|
||||
bucketScript("seriesArithmetic").setBucketsPaths("field2Sum", "field3Sum", "field4Sum").script(
|
||||
new Script("my_script", ScriptType.INDEXED, null, null)).gapPolicy(GapPolicy.INSERT_ZEROS))).execute().actionGet();
|
||||
new Script("my_script", ScriptType.INDEXED, null, null)))).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
|
@ -405,9 +396,7 @@ public class BucketScriptTests extends ElasticsearchIntegrationTest {
|
|||
Histogram.Bucket bucket = buckets.get(i);
|
||||
if (bucket.getDocCount() == 0) {
|
||||
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
|
||||
assertThat(seriesArithmetic, notNullValue());
|
||||
double seriesArithmeticValue = seriesArithmetic.value();
|
||||
assertThat(seriesArithmeticValue, equalTo(0.0));
|
||||
assertThat(seriesArithmetic, nullValue());
|
||||
} else {
|
||||
Sum field2Sum = bucket.getAggregations().get("field2Sum");
|
||||
assertThat(field2Sum, notNullValue());
|
||||
|
@ -476,9 +465,7 @@ public class BucketScriptTests extends ElasticsearchIntegrationTest {
|
|||
Histogram.Bucket bucket = buckets.get(i);
|
||||
if (bucket.getDocCount() == 0) {
|
||||
SimpleValue seriesArithmetic = bucket.getAggregations().get("seriesArithmetic");
|
||||
assertThat(seriesArithmetic, notNullValue());
|
||||
double seriesArithmeticValue = seriesArithmetic.value();
|
||||
assertTrue(Double.isNaN(seriesArithmeticValue));
|
||||
assertThat(seriesArithmetic, nullValue());
|
||||
} else {
|
||||
Sum field2Sum = bucket.getAggregations().get("field2Sum");
|
||||
assertThat(field2Sum, notNullValue());
|
||||
|
|
Loading…
Reference in New Issue