Aggregations fix: scripted heuristics for scoring significant_terms aggs were not thread safe when running local to the coordinating node. New code spawns an object for each shard search execution rather than sharing a common instance which is not thread safe.

Closes #18120
This commit is contained in:
markharwood 2016-10-25 15:00:54 +01:00
parent f3e578f942
commit 9944a594b1
4 changed files with 57 additions and 42 deletions

View File

@ -197,13 +197,13 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
}
}
getSignificanceHeuristic().initialize(reduceContext);
SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext);
final int size = Math.min(requiredSize, buckets.size());
BucketSignificancePriorityQueue<B> ordered = new BucketSignificancePriorityQueue<>(size);
for (Map.Entry<String, List<B>> entry : buckets.entrySet()) {
List<B> sameTermBuckets = entry.getValue();
final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
b.updateScore(getSignificanceHeuristic());
b.updateScore(heuristic);
if ((b.score > 0) && (b.subsetDf >= minDocCount)) {
ordered.insertWithOverflow(b);
}

View File

@ -217,9 +217,9 @@ public class SignificantTermsAggregationBuilder extends ValuesSourceAggregationB
@Override
protected ValuesSourceAggregatorFactory<ValuesSource, ?> innerBuild(AggregationContext context, ValuesSourceConfig<ValuesSource> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
this.significanceHeuristic.initialize(context.searchContext());
SignificanceHeuristic executionHeuristic = this.significanceHeuristic.rewrite(context.searchContext());
return new SignificantTermsAggregatorFactory(name, type, config, includeExclude, executionHint, filterBuilder,
bucketCountThresholds, significanceHeuristic, context, parent, subFactoriesBuilder, metaData);
bucketCountThresholds, executionHeuristic, context, parent, subFactoriesBuilder, metaData);
}
@Override

View File

@ -24,7 +24,6 @@ package org.elasticsearch.search.aggregations.bucket.significant.heuristics;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryShardException;
@ -43,18 +42,41 @@ import java.util.Objects;
public class ScriptHeuristic extends SignificanceHeuristic {
public static final String NAME = "script_heuristic";
private final Script script;
// This class holds an executable form of the script with private variables ready for execution
// on a single search thread.
static class ExecutableScriptHeuristic extends ScriptHeuristic {
private final LongAccessor subsetSizeHolder;
private final LongAccessor supersetSizeHolder;
private final LongAccessor subsetDfHolder;
private final LongAccessor supersetDfHolder;
private final Script script;
ExecutableScript executableScript = null;
private final ExecutableScript executableScript;
public ScriptHeuristic(Script script) {
ExecutableScriptHeuristic(Script script, ExecutableScript executableScript){
super(script);
subsetSizeHolder = new LongAccessor();
supersetSizeHolder = new LongAccessor();
subsetDfHolder = new LongAccessor();
supersetDfHolder = new LongAccessor();
this.executableScript = executableScript;
executableScript.setNextVar("_subset_freq", subsetDfHolder);
executableScript.setNextVar("_subset_size", subsetSizeHolder);
executableScript.setNextVar("_superset_freq", supersetDfHolder);
executableScript.setNextVar("_superset_size", supersetSizeHolder);
}
@Override
public double getScore(long subsetFreq, long subsetSize, long supersetFreq, long supersetSize) {
subsetSizeHolder.value = subsetSize;
supersetSizeHolder.value = supersetSize;
subsetDfHolder.value = subsetFreq;
supersetDfHolder.value = supersetFreq;
return ((Number) executableScript.run()).doubleValue();
}
}
public ScriptHeuristic(Script script) {
this.script = script;
}
@ -71,22 +93,15 @@ public class ScriptHeuristic extends SignificanceHeuristic {
}
@Override
public void initialize(InternalAggregation.ReduceContext context) {
initialize(context.scriptService().executable(script, ScriptContext.Standard.AGGS, Collections.emptyMap()));
public SignificanceHeuristic rewrite(InternalAggregation.ReduceContext context) {
return new ExecutableScriptHeuristic(script, context.scriptService().executable(script, ScriptContext.Standard.AGGS, Collections.emptyMap()));
}
@Override
public void initialize(SearchContext context) {
initialize(context.getQueryShardContext().getExecutableScript(script, ScriptContext.Standard.AGGS, Collections.emptyMap()));
public SignificanceHeuristic rewrite(SearchContext context) {
return new ExecutableScriptHeuristic(script, context.getQueryShardContext().getExecutableScript(script, ScriptContext.Standard.AGGS, Collections.emptyMap()));
}
public void initialize(ExecutableScript executableScript) {
executableScript.setNextVar("_subset_freq", subsetDfHolder);
executableScript.setNextVar("_subset_size", subsetSizeHolder);
executableScript.setNextVar("_superset_freq", supersetDfHolder);
executableScript.setNextVar("_superset_size", supersetSizeHolder);
this.executableScript = executableScript;
}
/**
* Calculates score with a script
@ -99,19 +114,7 @@ public class ScriptHeuristic extends SignificanceHeuristic {
*/
@Override
public double getScore(long subsetFreq, long subsetSize, long supersetFreq, long supersetSize) {
if (executableScript == null) {
//In tests, wehn calling assertSearchResponse(..) the response is streamed one additional time with an arbitrary version, see assertVersionSerializable(..).
// Now, for version before 1.5.0 the score is computed after streaming the response but for scripts the script does not exists yet.
// assertSearchResponse() might therefore fail although there is no problem.
// This should be replaced by an exception in 2.0.
ESLoggerFactory.getLogger("script heuristic").warn("cannot compute score - script has not been initialized yet.");
return 0;
}
subsetSizeHolder.value = subsetSize;
supersetSizeHolder.value = supersetSize;
subsetDfHolder.value = subsetFreq;
supersetDfHolder.value = supersetFreq;
return ((Number) executableScript.run()).doubleValue();
throw new UnsupportedOperationException("This scoring heuristic must have 'rewrite' called on it to provide a version ready for use");
}
@Override

View File

@ -50,11 +50,23 @@ public abstract class SignificanceHeuristic implements NamedWriteable, ToXConten
}
}
public void initialize(InternalAggregation.ReduceContext reduceContext) {
/**
* Provides a hook for subclasses to provide a version of the heuristic
* prepared for execution on data on the coordinating node.
* @param reduceContext the reduce context on the coordinating node
* @return a version of this heuristic suitable for execution
*/
public SignificanceHeuristic rewrite(InternalAggregation.ReduceContext reduceContext) {
return this;
}
public void initialize(SearchContext context) {
/**
* Provides a hook for subclasses to provide a version of the heuristic
* prepared for execution on data on a shard.
* @param context the search context on the data node
* @return a version of this heuristic suitable for execution
*/
public SignificanceHeuristic rewrite(SearchContext context) {
return this;
}
}