SCRIPTING: Move Aggregation Scripts to their own context (#32068)

* SCRIPTING: Move Aggregation Scripts to their own context
This commit is contained in:
Armin Braun 2018-08-04 10:37:07 +02:00 committed by GitHub
parent 6ca24e13af
commit 6fa7016bbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 254 additions and 53 deletions

View File

@ -34,10 +34,12 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.GeoPointFieldMapper.GeoPointFieldType;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.GeoPointFieldMapper.GeoPointFieldType;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.script.BucketAggregationScript;
import org.elasticsearch.script.BucketAggregationSelectorScript;
import org.elasticsearch.script.ClassPermission;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.FilterScript;
@ -54,6 +56,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -112,6 +115,17 @@ public class ExpressionScriptEngine extends AbstractComponent implements ScriptE
} else if (context.instanceClazz.equals(ExecutableScript.class)) {
ExecutableScript.Factory factory = (p) -> new ExpressionExecutableScript(expr, p);
return context.factoryClazz.cast(factory);
} else if (context.instanceClazz.equals(BucketAggregationScript.class)) {
return context.factoryClazz.cast(newBucketAggregationScriptFactory(expr));
} else if (context.instanceClazz.equals(BucketAggregationSelectorScript.class)) {
BucketAggregationScript.Factory factory = newBucketAggregationScriptFactory(expr);
BucketAggregationSelectorScript.Factory wrappedFactory = parameters -> new BucketAggregationSelectorScript(parameters) {
@Override
public boolean execute() {
return factory.newInstance(getParams()).execute() == 1.0;
}
};
return context.factoryClazz.cast(wrappedFactory);
} else if (context.instanceClazz.equals(FilterScript.class)) {
FilterScript.Factory factory = (p, lookup) -> newFilterScript(expr, lookup, p);
return context.factoryClazz.cast(factory);
@ -122,6 +136,37 @@ public class ExpressionScriptEngine extends AbstractComponent implements ScriptE
throw new IllegalArgumentException("expression engine does not know how to handle script context [" + context.name + "]");
}
private static BucketAggregationScript.Factory newBucketAggregationScriptFactory(Expression expr) {
return parameters -> {
ReplaceableConstDoubleValues[] functionValuesArray =
new ReplaceableConstDoubleValues[expr.variables.length];
Map<String, ReplaceableConstDoubleValues> functionValuesMap = new HashMap<>();
for (int i = 0; i < expr.variables.length; ++i) {
functionValuesArray[i] = new ReplaceableConstDoubleValues();
functionValuesMap.put(expr.variables[i], functionValuesArray[i]);
}
return new BucketAggregationScript(parameters) {
@Override
public double execute() {
getParams().forEach((name, value) -> {
ReplaceableConstDoubleValues placeholder = functionValuesMap.get(name);
if (placeholder == null) {
throw new IllegalArgumentException("Error using " + expr + ". " +
"The variable [" + name + "] does not exist in the executable expressions script.");
} else if (value instanceof Number == false) {
throw new IllegalArgumentException("Error using " + expr + ". " +
"Executable expressions scripts can only process numbers." +
" The variable [" + name + "] is not a number.");
} else {
placeholder.setValue(((Number) value).doubleValue());
}
});
return expr.evaluate(functionValuesArray);
}
};
};
}
private SearchScript.LeafFactory newSearchScript(Expression expr, SearchLookup lookup, @Nullable Map<String, Object> vars) {
MapperService mapper = lookup.doc().mapperService();
// NOTE: if we need to do anything complicated with bindings in the future, we can just extend Bindings,
@ -267,7 +312,7 @@ public class ExpressionScriptEngine extends AbstractComponent implements ScriptE
};
};
}
private ScoreScript.LeafFactory newScoreScript(Expression expr, SearchLookup lookup, @Nullable Map<String, Object> vars) {
SearchScript.LeafFactory searchLeafFactory = newSearchScript(expr, lookup, vars);
return new ScoreScript.LeafFactory() {
@ -284,17 +329,17 @@ public class ExpressionScriptEngine extends AbstractComponent implements ScriptE
public double execute() {
return script.runAsDouble();
}
@Override
public void setDocument(int docid) {
script.setDocument(docid);
}
@Override
public void setScorer(Scorer scorer) {
script.setScorer(scorer);
}
@Override
public double get_score() {
return script.getScore();

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.script;
import java.util.Map;
/**
* A script used in bucket aggregations that returns a {@code double} value.
*/
public abstract class BucketAggregationScript {
public static final String[] PARAMETERS = {};
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>("bucket_aggregation", Factory.class);
/**
* The generic runtime parameters for the script.
*/
private final Map<String, Object> params;
public BucketAggregationScript(Map<String, Object> params) {
this.params = params;
}
/**
* Return the parameters for this script.
*/
public Map<String, Object> getParams() {
return params;
}
public abstract double execute();
public interface Factory {
BucketAggregationScript newInstance(Map<String, Object> params);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.script;
import java.util.Map;
/**
* A script used in bucket aggregations that returns a {@code boolean} value.
*/
public abstract class BucketAggregationSelectorScript {
public static final String[] PARAMETERS = {};
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>("aggregation_selector", Factory.class);
/**
* The generic runtime parameters for the script.
*/
private final Map<String, Object> params;
public BucketAggregationSelectorScript(Map<String, Object> params) {
this.params = params;
}
/**
* Return the parameters for this script.
*/
public Map<String, Object> getParams() {
return params;
}
public abstract boolean execute();
public interface Factory {
BucketAggregationSelectorScript newInstance(Map<String, Object> params);
}
}

View File

@ -48,6 +48,5 @@ public interface ExecutableScript {
ScriptContext<Factory> CONTEXT = new ScriptContext<>("executable", Factory.class);
// TODO: remove these once each has its own script interface
ScriptContext<Factory> AGGS_CONTEXT = new ScriptContext<>("aggs_executable", Factory.class);
ScriptContext<Factory> UPDATE_CONTEXT = new ScriptContext<>("update", Factory.class);
}

View File

@ -46,7 +46,9 @@ public class ScriptModule {
SearchScript.SCRIPT_SORT_CONTEXT,
SearchScript.TERMS_SET_QUERY_CONTEXT,
ExecutableScript.CONTEXT,
ExecutableScript.AGGS_CONTEXT,
BucketAggregationScript.CONTEXT,
BucketAggregationSelectorScript.CONTEXT,
SignificantTermsHeuristicScoreScript.CONTEXT,
ExecutableScript.UPDATE_CONTEXT,
IngestScript.CONTEXT,
FilterScript.CONTEXT,

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.script;
import java.util.Map;
/**
* A script used in significant terms heuristic scoring.
*/
public abstract class SignificantTermsHeuristicScoreScript {
public static final String[] PARAMETERS = { "params" };
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>("script_heuristic", Factory.class);
public abstract double execute(Map<String, Object> params);
public interface Factory {
SignificantTermsHeuristicScoreScript newInstance();
}
}

View File

@ -28,12 +28,14 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.SignificantTermsHeuristicScoreScript;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class ScriptHeuristic extends SignificanceHeuristic {
@ -48,19 +50,21 @@ public class ScriptHeuristic extends SignificanceHeuristic {
private final LongAccessor supersetSizeHolder;
private final LongAccessor subsetDfHolder;
private final LongAccessor supersetDfHolder;
private final ExecutableScript executableScript;
private final SignificantTermsHeuristicScoreScript executableScript;
private final Map<String, Object> params = new HashMap<>();
ExecutableScriptHeuristic(Script script, ExecutableScript executableScript){
ExecutableScriptHeuristic(Script script, SignificantTermsHeuristicScoreScript executableScript) {
super(script);
subsetSizeHolder = new LongAccessor();
supersetSizeHolder = new LongAccessor();
subsetDfHolder = new LongAccessor();
supersetDfHolder = new LongAccessor();
this.executableScript = executableScript;
executableScript.setNextVar("_subset_freq", subsetDfHolder);
executableScript.setNextVar("_subset_size", subsetSizeHolder);
executableScript.setNextVar("_superset_freq", supersetDfHolder);
executableScript.setNextVar("_superset_size", supersetSizeHolder);
params.putAll(script.getParams());
params.put("_subset_freq", subsetDfHolder);
params.put("_subset_size", subsetSizeHolder);
params.put("_superset_freq", supersetDfHolder);
params.put("_superset_size", supersetSizeHolder);
}
@Override
@ -69,7 +73,7 @@ public class ScriptHeuristic extends SignificanceHeuristic {
supersetSizeHolder.value = supersetSize;
subsetDfHolder.value = subsetFreq;
supersetDfHolder.value = supersetFreq;
return ((Number) executableScript.run()).doubleValue();
return executableScript.execute(params);
}
}
@ -91,15 +95,15 @@ public class ScriptHeuristic extends SignificanceHeuristic {
@Override
public SignificanceHeuristic rewrite(InternalAggregation.ReduceContext context) {
ExecutableScript.Factory factory = context.scriptService().compile(script, ExecutableScript.AGGS_CONTEXT);
return new ExecutableScriptHeuristic(script, factory.newInstance(script.getParams()));
SignificantTermsHeuristicScoreScript.Factory factory = context.scriptService().compile(script, SignificantTermsHeuristicScoreScript.CONTEXT);
return new ExecutableScriptHeuristic(script, factory.newInstance());
}
@Override
public SignificanceHeuristic rewrite(SearchContext context) {
QueryShardContext shardContext = context.getQueryShardContext();
ExecutableScript.Factory compiledScript = shardContext.getScriptService().compile(script, ExecutableScript.AGGS_CONTEXT);
return new ExecutableScriptHeuristic(script, compiledScript.newInstance(script.getParams()));
SignificantTermsHeuristicScoreScript.Factory compiledScript = shardContext.getScriptService().compile(script, SignificantTermsHeuristicScoreScript.CONTEXT);
return new ExecutableScriptHeuristic(script, compiledScript.newInstance());
}

View File

@ -21,10 +21,9 @@ package org.elasticsearch.search.aggregations.pipeline.bucketscript;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.BucketAggregationScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -89,7 +88,8 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = originalAgg.getBuckets();
ExecutableScript.Factory factory = reduceContext.scriptService().compile(script, ExecutableScript.AGGS_CONTEXT);
BucketAggregationScript.Factory factory =
reduceContext.scriptService().compile(script, BucketAggregationScript.CONTEXT);
List<InternalMultiBucketAggregation.InternalBucket> newBuckets = new ArrayList<>();
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
Map<String, Object> vars = new HashMap<>();
@ -110,24 +110,13 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
if (skipBucket) {
newBuckets.add(bucket);
} else {
ExecutableScript executableScript = factory.newInstance(vars);
Object returned = executableScript.run();
// no need to check for self references since only numbers are valid
if (returned == null) {
newBuckets.add(bucket);
} else {
if ((returned instanceof Number) == false) {
throw new AggregationExecutionException("series_arithmetic script for reducer [" + name()
+ "] must return a Number");
}
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map(
(p) -> (InternalAggregation) p).collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), ((Number) returned).doubleValue(), formatter,
new ArrayList<>(), metaData()));
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs),
bucket);
newBuckets.add(newBucket);
}
double returned = factory.newInstance(vars).execute();
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map(
(p) -> (InternalAggregation) p).collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), returned, formatter, new ArrayList<>(), metaData()));
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs),
bucket);
newBuckets.add(newBucket);
}
}
return originalAgg.create(newBuckets);

View File

@ -22,7 +22,7 @@ package org.elasticsearch.search.aggregations.pipeline.bucketselector;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.BucketAggregationSelectorScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
@ -82,7 +82,8 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator {
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = originalAgg.getBuckets();
ExecutableScript.Factory factory = reduceContext.scriptService().compile(script, ExecutableScript.AGGS_CONTEXT);
BucketAggregationSelectorScript.Factory factory =
reduceContext.scriptService().compile(script, BucketAggregationSelectorScript.CONTEXT);
List<InternalMultiBucketAggregation.InternalBucket> newBuckets = new ArrayList<>();
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
Map<String, Object> vars = new HashMap<>();
@ -96,17 +97,8 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator {
vars.put(varName, value);
}
// TODO: can we use one instance of the script for all buckets? it should be stateless?
ExecutableScript executableScript = factory.newInstance(vars);
Object scriptReturnValue = executableScript.run();
final boolean keepBucket;
// TODO: WTF!!!!!
if ("expression".equals(script.getLang())) {
double scriptDoubleValue = (double) scriptReturnValue;
keepBucket = scriptDoubleValue == 1.0;
} else {
keepBucket = (boolean) scriptReturnValue;
}
if (keepBucket) {
BucketAggregationSelectorScript executableScript = factory.newInstance(vars);
if (executableScript.execute()) {
newBuckets.add(bucket);
}
}

View File

@ -96,6 +96,30 @@ public class MockScriptEngine implements ScriptEngine {
}
};
return context.factoryClazz.cast(factory);
} else if (context.instanceClazz.equals(BucketAggregationScript.class)) {
BucketAggregationScript.Factory factory = parameters -> new BucketAggregationScript(parameters) {
@Override
public double execute() {
return ((Number) script.apply(getParams())).doubleValue();
}
};
return context.factoryClazz.cast(factory);
} else if (context.instanceClazz.equals(BucketAggregationSelectorScript.class)) {
BucketAggregationSelectorScript.Factory factory = parameters -> new BucketAggregationSelectorScript(parameters) {
@Override
public boolean execute() {
return (boolean) script.apply(getParams());
}
};
return context.factoryClazz.cast(factory);
} else if (context.instanceClazz.equals(SignificantTermsHeuristicScoreScript.class)) {
SignificantTermsHeuristicScoreScript.Factory factory = () -> new SignificantTermsHeuristicScoreScript() {
@Override
public double execute(Map<String, Object> vars) {
return ((Number) script.apply(vars)).doubleValue();
}
};
return context.factoryClazz.cast(factory);
} else if (context.instanceClazz.equals(TemplateScript.class)) {
TemplateScript.Factory factory = vars -> {
// TODO: need a better way to implement all these new contexts