From 9944a594b10d5cfa34fcf2fa8ed45ca6d52345fe Mon Sep 17 00:00:00 2001 From: markharwood Date: Tue, 25 Oct 2016 15:00:54 +0100 Subject: [PATCH] Aggregations fix: scripted heuristics for scoring significant_terms aggs were not thread safe when running local to the coordinating node. New code spawns an object for each shard search execution rather than sharing a common instance which is not thread safe. Closes #18120 --- .../significant/InternalSignificantTerms.java | 4 +- .../SignificantTermsAggregationBuilder.java | 4 +- .../heuristics/ScriptHeuristic.java | 71 ++++++++++--------- .../heuristics/SignificanceHeuristic.java | 20 ++++-- 4 files changed, 57 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java index cd18386da1e..cdd1f8d19a7 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java @@ -197,13 +197,13 @@ public abstract class InternalSignificantTerms ordered = new BucketSignificancePriorityQueue<>(size); for (Map.Entry> entry : buckets.entrySet()) { List sameTermBuckets = entry.getValue(); final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext); - b.updateScore(getSignificanceHeuristic()); + b.updateScore(heuristic); if ((b.score > 0) && (b.subsetDf >= minDocCount)) { ordered.insertWithOverflow(b); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregationBuilder.java index 1cf422ae50a..5af538965d1 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregationBuilder.java @@ -217,9 +217,9 @@ public class SignificantTermsAggregationBuilder extends ValuesSourceAggregationB @Override protected ValuesSourceAggregatorFactory innerBuild(AggregationContext context, ValuesSourceConfig config, AggregatorFactory parent, Builder subFactoriesBuilder) throws IOException { - this.significanceHeuristic.initialize(context.searchContext()); + SignificanceHeuristic executionHeuristic = this.significanceHeuristic.rewrite(context.searchContext()); return new SignificantTermsAggregatorFactory(name, type, config, includeExclude, executionHint, filterBuilder, - bucketCountThresholds, significanceHeuristic, context, parent, subFactoriesBuilder, metaData); + bucketCountThresholds, executionHeuristic, context, parent, subFactoriesBuilder, metaData); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/heuristics/ScriptHeuristic.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/heuristics/ScriptHeuristic.java index c854b036b00..748adb67ce5 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/heuristics/ScriptHeuristic.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/heuristics/ScriptHeuristic.java @@ -24,7 +24,6 @@ package org.elasticsearch.search.aggregations.bucket.significant.heuristics; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryShardException; @@ -43,18 +42,41 @@ import java.util.Objects; public class ScriptHeuristic extends SignificanceHeuristic { public static final String NAME = "script_heuristic"; - private final LongAccessor subsetSizeHolder; - private final LongAccessor supersetSizeHolder; - private final LongAccessor subsetDfHolder; - private final LongAccessor supersetDfHolder; private final Script script; - ExecutableScript executableScript = null; + + // This class holds an executable form of the script with private variables ready for execution + // on a single search thread. + static class ExecutableScriptHeuristic extends ScriptHeuristic { + private final LongAccessor subsetSizeHolder; + private final LongAccessor supersetSizeHolder; + private final LongAccessor subsetDfHolder; + private final LongAccessor supersetDfHolder; + private final ExecutableScript executableScript; + + ExecutableScriptHeuristic(Script script, ExecutableScript executableScript){ + super(script); + subsetSizeHolder = new LongAccessor(); + supersetSizeHolder = new LongAccessor(); + subsetDfHolder = new LongAccessor(); + supersetDfHolder = new LongAccessor(); + this.executableScript = executableScript; + executableScript.setNextVar("_subset_freq", subsetDfHolder); + executableScript.setNextVar("_subset_size", subsetSizeHolder); + executableScript.setNextVar("_superset_freq", supersetDfHolder); + executableScript.setNextVar("_superset_size", supersetSizeHolder); + } + + @Override + public double getScore(long subsetFreq, long subsetSize, long supersetFreq, long supersetSize) { + subsetSizeHolder.value = subsetSize; + supersetSizeHolder.value = supersetSize; + subsetDfHolder.value = subsetFreq; + supersetDfHolder.value = supersetFreq; + return ((Number) executableScript.run()).doubleValue(); + } + } public ScriptHeuristic(Script script) { - subsetSizeHolder = new LongAccessor(); - supersetSizeHolder = new LongAccessor(); - subsetDfHolder = new LongAccessor(); - supersetDfHolder = new LongAccessor(); this.script = script; } @@ -71,22 +93,15 @@ public class ScriptHeuristic extends SignificanceHeuristic { } @Override - public void initialize(InternalAggregation.ReduceContext context) { - initialize(context.scriptService().executable(script, ScriptContext.Standard.AGGS, Collections.emptyMap())); + public SignificanceHeuristic rewrite(InternalAggregation.ReduceContext context) { + return new ExecutableScriptHeuristic(script, context.scriptService().executable(script, ScriptContext.Standard.AGGS, Collections.emptyMap())); } @Override - public void initialize(SearchContext context) { - initialize(context.getQueryShardContext().getExecutableScript(script, ScriptContext.Standard.AGGS, Collections.emptyMap())); + public SignificanceHeuristic rewrite(SearchContext context) { + return new ExecutableScriptHeuristic(script, context.getQueryShardContext().getExecutableScript(script, ScriptContext.Standard.AGGS, Collections.emptyMap())); } - public void initialize(ExecutableScript executableScript) { - executableScript.setNextVar("_subset_freq", subsetDfHolder); - executableScript.setNextVar("_subset_size", subsetSizeHolder); - executableScript.setNextVar("_superset_freq", supersetDfHolder); - executableScript.setNextVar("_superset_size", supersetSizeHolder); - this.executableScript = executableScript; - } /** * Calculates score with a script @@ -99,19 +114,7 @@ public class ScriptHeuristic extends SignificanceHeuristic { */ @Override public double getScore(long subsetFreq, long subsetSize, long supersetFreq, long supersetSize) { - if (executableScript == null) { - //In tests, wehn calling assertSearchResponse(..) the response is streamed one additional time with an arbitrary version, see assertVersionSerializable(..). - // Now, for version before 1.5.0 the score is computed after streaming the response but for scripts the script does not exists yet. - // assertSearchResponse() might therefore fail although there is no problem. - // This should be replaced by an exception in 2.0. - ESLoggerFactory.getLogger("script heuristic").warn("cannot compute score - script has not been initialized yet."); - return 0; - } - subsetSizeHolder.value = subsetSize; - supersetSizeHolder.value = supersetSize; - subsetDfHolder.value = subsetFreq; - supersetDfHolder.value = supersetFreq; - return ((Number) executableScript.run()).doubleValue(); + throw new UnsupportedOperationException("This scoring heuristic must have 'rewrite' called on it to provide a version ready for use"); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/heuristics/SignificanceHeuristic.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/heuristics/SignificanceHeuristic.java index db9711c1a8d..7b6cf699741 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/heuristics/SignificanceHeuristic.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/heuristics/SignificanceHeuristic.java @@ -50,11 +50,23 @@ public abstract class SignificanceHeuristic implements NamedWriteable, ToXConten } } - public void initialize(InternalAggregation.ReduceContext reduceContext) { - + /** + * Provides a hook for subclasses to provide a version of the heuristic + * prepared for execution on data on the coordinating node. + * @param reduceContext the reduce context on the coordinating node + * @return a version of this heuristic suitable for execution + */ + public SignificanceHeuristic rewrite(InternalAggregation.ReduceContext reduceContext) { + return this; } - public void initialize(SearchContext context) { - + /** + * Provides a hook for subclasses to provide a version of the heuristic + * prepared for execution on data on a shard. + * @param context the search context on the data node + * @return a version of this heuristic suitable for execution + */ + public SignificanceHeuristic rewrite(SearchContext context) { + return this; } }