mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Fixes two bugs introduced by #57627: 1. We were not properly letting go of memory from the request breaker when the aggregation finished. 2. We no longer supported totally arbitrary stuff produced by the init script because we *assumed* that it'd be ok to run the script once and clone its results. Sadly, cloning can't clone *anything* that the init script can make, like `String` arrays. This runs the init script once for every new bucket so we don't need to clone.
This commit is contained in:
parent
468e559ff7
commit
5f52bc4c9f
@ -449,7 +449,7 @@ public class ScriptedMetricIT extends ESIntegTestCase {
|
|||||||
assertThat(numShardsRun, greaterThan(0));
|
assertThat(numShardsRun, greaterThan(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInitMapWithParams() {
|
public void testInitMutatesParams() {
|
||||||
Map<String, Object> varsMap = new HashMap<>();
|
Map<String, Object> varsMap = new HashMap<>();
|
||||||
varsMap.put("multiplier", 1);
|
varsMap.put("multiplier", 1);
|
||||||
|
|
||||||
|
@ -232,7 +232,7 @@ public class ScriptedMetricAggregationBuilder extends AbstractAggregationBuilder
|
|||||||
compiledInitScript = queryShardContext.compile(initScript, ScriptedMetricAggContexts.InitScript.CONTEXT);
|
compiledInitScript = queryShardContext.compile(initScript, ScriptedMetricAggContexts.InitScript.CONTEXT);
|
||||||
initScriptParams = initScript.getParams();
|
initScriptParams = initScript.getParams();
|
||||||
} else {
|
} else {
|
||||||
compiledInitScript = (p, a) -> null;
|
compiledInitScript = null;
|
||||||
initScriptParams = Collections.emptyMap();
|
initScriptParams = Collections.emptyMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -241,12 +241,9 @@ public class ScriptedMetricAggregationBuilder extends AbstractAggregationBuilder
|
|||||||
Map<String, Object> mapScriptParams = mapScript.getParams();
|
Map<String, Object> mapScriptParams = mapScript.getParams();
|
||||||
|
|
||||||
|
|
||||||
ScriptedMetricAggContexts.CombineScript.Factory compiledCombineScript;
|
ScriptedMetricAggContexts.CombineScript.Factory compiledCombineScript = queryShardContext.compile(combineScript,
|
||||||
Map<String, Object> combineScriptParams;
|
|
||||||
|
|
||||||
compiledCombineScript = queryShardContext.compile(combineScript,
|
|
||||||
ScriptedMetricAggContexts.CombineScript.CONTEXT);
|
ScriptedMetricAggContexts.CombineScript.CONTEXT);
|
||||||
combineScriptParams = combineScript.getParams();
|
Map<String, Object> combineScriptParams = combineScript.getParams();
|
||||||
|
|
||||||
return new ScriptedMetricAggregatorFactory(name, compiledMapScript, mapScriptParams, compiledInitScript,
|
return new ScriptedMetricAggregatorFactory(name, compiledMapScript, mapScriptParams, compiledInitScript,
|
||||||
initScriptParams, compiledCombineScript, combineScriptParams, reduceScript,
|
initScriptParams, compiledCombineScript, combineScriptParams, reduceScript,
|
||||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.metrics;
|
|||||||
import org.apache.lucene.index.LeafReaderContext;
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
import org.apache.lucene.search.Scorable;
|
import org.apache.lucene.search.Scorable;
|
||||||
import org.apache.lucene.search.ScoreMode;
|
import org.apache.lucene.search.ScoreMode;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.lease.Releasables;
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
import org.elasticsearch.common.util.CollectionUtils;
|
import org.elasticsearch.common.util.CollectionUtils;
|
||||||
@ -53,7 +54,10 @@ class ScriptedMetricAggregator extends MetricsAggregator {
|
|||||||
private static final long BUCKET_COST_ESTIMATE = 1024 * 5;
|
private static final long BUCKET_COST_ESTIMATE = 1024 * 5;
|
||||||
|
|
||||||
private final SearchLookup lookup;
|
private final SearchLookup lookup;
|
||||||
private final Map<String, Object> initialState;
|
private final Map<String, Object> aggParams;
|
||||||
|
@Nullable
|
||||||
|
private final ScriptedMetricAggContexts.InitScript.Factory initScriptFactory;
|
||||||
|
private final Map<String, Object> initScriptParams;
|
||||||
private final ScriptedMetricAggContexts.MapScript.Factory mapScriptFactory;
|
private final ScriptedMetricAggContexts.MapScript.Factory mapScriptFactory;
|
||||||
private final Map<String, Object> mapScriptParams;
|
private final Map<String, Object> mapScriptParams;
|
||||||
private final ScriptedMetricAggContexts.CombineScript.Factory combineScriptFactory;
|
private final ScriptedMetricAggContexts.CombineScript.Factory combineScriptFactory;
|
||||||
@ -64,7 +68,9 @@ class ScriptedMetricAggregator extends MetricsAggregator {
|
|||||||
ScriptedMetricAggregator(
|
ScriptedMetricAggregator(
|
||||||
String name,
|
String name,
|
||||||
SearchLookup lookup,
|
SearchLookup lookup,
|
||||||
Map<String, Object> initialState,
|
Map<String, Object> aggParams,
|
||||||
|
@Nullable ScriptedMetricAggContexts.InitScript.Factory initScriptFactory,
|
||||||
|
Map<String, Object> initScriptParams,
|
||||||
ScriptedMetricAggContexts.MapScript.Factory mapScriptFactory,
|
ScriptedMetricAggContexts.MapScript.Factory mapScriptFactory,
|
||||||
Map<String, Object> mapScriptParams,
|
Map<String, Object> mapScriptParams,
|
||||||
ScriptedMetricAggContexts.CombineScript.Factory combineScriptFactory,
|
ScriptedMetricAggContexts.CombineScript.Factory combineScriptFactory,
|
||||||
@ -76,7 +82,9 @@ class ScriptedMetricAggregator extends MetricsAggregator {
|
|||||||
) throws IOException {
|
) throws IOException {
|
||||||
super(name, context, parent, metadata);
|
super(name, context, parent, metadata);
|
||||||
this.lookup = lookup;
|
this.lookup = lookup;
|
||||||
this.initialState = initialState;
|
this.aggParams = aggParams;
|
||||||
|
this.initScriptFactory = initScriptFactory;
|
||||||
|
this.initScriptParams = initScriptParams;
|
||||||
this.mapScriptFactory = mapScriptFactory;
|
this.mapScriptFactory = mapScriptFactory;
|
||||||
this.mapScriptParams = mapScriptParams;
|
this.mapScriptParams = mapScriptParams;
|
||||||
this.combineScriptFactory = combineScriptFactory;
|
this.combineScriptFactory = combineScriptFactory;
|
||||||
@ -129,39 +137,22 @@ class ScriptedMetricAggregator extends MetricsAggregator {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
|
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
|
||||||
Object result = resultFor(aggStateFor(owningBucketOrdinal));
|
Object result = aggStateForResult(owningBucketOrdinal).combine();
|
||||||
StreamOutput.checkWriteable(result);
|
StreamOutput.checkWriteable(result);
|
||||||
return new InternalScriptedMetric(name, result, reduceScript, metadata());
|
return new InternalScriptedMetric(name, result, reduceScript, metadata());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, Object> aggStateFor(long owningBucketOrdinal) {
|
private State aggStateForResult(long owningBucketOrdinal) {
|
||||||
if (owningBucketOrdinal >= states.size()) {
|
if (owningBucketOrdinal >= states.size()) {
|
||||||
return newInitialState();
|
return new State();
|
||||||
}
|
}
|
||||||
State state = states.get(owningBucketOrdinal);
|
State state = states.get(owningBucketOrdinal);
|
||||||
if (state == null) {
|
if (state == null) {
|
||||||
return newInitialState();
|
return new State();
|
||||||
}
|
}
|
||||||
// The last script that touched the state at this point is the "map" script
|
// The last script that touched the state at this point is the "map" script
|
||||||
CollectionUtils.ensureNoSelfReferences(state.aggState, "Scripted metric aggs map script");
|
CollectionUtils.ensureNoSelfReferences(state.aggState, "Scripted metric aggs map script");
|
||||||
return state.aggState;
|
return state;
|
||||||
}
|
|
||||||
|
|
||||||
private Object resultFor(Map<String, Object> aggState) {
|
|
||||||
if (combineScriptFactory == null) {
|
|
||||||
return aggState;
|
|
||||||
}
|
|
||||||
Object result = combineScriptFactory.newInstance(
|
|
||||||
// Send a deep copy of the params because the script is allowed to mutate it
|
|
||||||
ScriptedMetricAggregatorFactory.deepCopyParams(combineScriptParams, context),
|
|
||||||
aggState
|
|
||||||
).execute();
|
|
||||||
CollectionUtils.ensureNoSelfReferences(result, "Scripted metric aggs combine script");
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, Object> newInitialState() {
|
|
||||||
return initialState == null ? new HashMap<>() : ScriptedMetricAggregatorFactory.deepCopyParams(initialState, context);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -170,23 +161,48 @@ class ScriptedMetricAggregator extends MetricsAggregator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void doClose() {
|
||||||
Releasables.close(states);
|
Releasables.close(states);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class State {
|
private class State {
|
||||||
private final ScriptedMetricAggContexts.MapScript.LeafFactory mapScript;
|
private final ScriptedMetricAggContexts.MapScript.LeafFactory mapScript;
|
||||||
|
private final Map<String, Object> mapScriptParamsForState;
|
||||||
|
private final Map<String, Object> combineScriptParamsForState;
|
||||||
private final Map<String, Object> aggState;
|
private final Map<String, Object> aggState;
|
||||||
private MapScript leafMapScript;
|
private MapScript leafMapScript;
|
||||||
|
|
||||||
State() {
|
State() {
|
||||||
aggState = newInitialState();
|
// Its possible for building the initial state to mutate the parameters as a side effect
|
||||||
|
Map<String, Object> aggParamsForState = ScriptedMetricAggregatorFactory.deepCopyParams(aggParams, context);
|
||||||
|
mapScriptParamsForState = ScriptedMetricAggregatorFactory.mergeParams(aggParamsForState, mapScriptParams);
|
||||||
|
combineScriptParamsForState = ScriptedMetricAggregatorFactory.mergeParams(aggParamsForState, combineScriptParams);
|
||||||
|
aggState = newInitialState(ScriptedMetricAggregatorFactory.mergeParams(aggParamsForState, initScriptParams));
|
||||||
mapScript = mapScriptFactory.newFactory(
|
mapScript = mapScriptFactory.newFactory(
|
||||||
// Send a deep copy of the params because the script is allowed to mutate it
|
ScriptedMetricAggregatorFactory.deepCopyParams(mapScriptParamsForState, context),
|
||||||
ScriptedMetricAggregatorFactory.deepCopyParams(mapScriptParams, context),
|
|
||||||
aggState,
|
aggState,
|
||||||
lookup
|
lookup
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> newInitialState(Map<String, Object> initScriptParamsForState) {
|
||||||
|
if (initScriptFactory == null) {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
Map<String, Object> initialState = new HashMap<>();
|
||||||
|
initScriptFactory.newInstance(initScriptParamsForState, initialState).execute();
|
||||||
|
CollectionUtils.ensureNoSelfReferences(initialState, "Scripted metric aggs init script");
|
||||||
|
return initialState;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object combine() {
|
||||||
|
if (combineScriptFactory == null) {
|
||||||
|
return aggState;
|
||||||
|
}
|
||||||
|
Object result = combineScriptFactory.newInstance(combineScriptParamsForState, aggState).execute();
|
||||||
|
CollectionUtils.ensureNoSelfReferences(result, "Scripted metric aggs combine script");
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
|
|
||||||
package org.elasticsearch.search.aggregations.metrics;
|
package org.elasticsearch.search.aggregations.metrics;
|
||||||
|
|
||||||
import org.elasticsearch.common.util.CollectionUtils;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.index.query.QueryShardContext;
|
import org.elasticsearch.index.query.QueryShardContext;
|
||||||
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.Script;
|
||||||
import org.elasticsearch.script.ScriptedMetricAggContexts;
|
import org.elasticsearch.script.ScriptedMetricAggContexts;
|
||||||
@ -45,16 +45,26 @@ class ScriptedMetricAggregatorFactory extends AggregatorFactory {
|
|||||||
private final Script reduceScript;
|
private final Script reduceScript;
|
||||||
private final Map<String, Object> aggParams;
|
private final Map<String, Object> aggParams;
|
||||||
private final SearchLookup lookup;
|
private final SearchLookup lookup;
|
||||||
|
@Nullable
|
||||||
private final ScriptedMetricAggContexts.InitScript.Factory initScript;
|
private final ScriptedMetricAggContexts.InitScript.Factory initScript;
|
||||||
private final Map<String, Object> initScriptParams;
|
private final Map<String, Object> initScriptParams;
|
||||||
|
|
||||||
ScriptedMetricAggregatorFactory(String name,
|
ScriptedMetricAggregatorFactory(
|
||||||
ScriptedMetricAggContexts.MapScript.Factory mapScript, Map<String, Object> mapScriptParams,
|
String name,
|
||||||
ScriptedMetricAggContexts.InitScript.Factory initScript, Map<String, Object> initScriptParams,
|
ScriptedMetricAggContexts.MapScript.Factory mapScript,
|
||||||
ScriptedMetricAggContexts.CombineScript.Factory combineScript,
|
Map<String, Object> mapScriptParams,
|
||||||
Map<String, Object> combineScriptParams, Script reduceScript, Map<String, Object> aggParams,
|
@Nullable ScriptedMetricAggContexts.InitScript.Factory initScript,
|
||||||
SearchLookup lookup, QueryShardContext queryShardContext, AggregatorFactory parent,
|
Map<String, Object> initScriptParams,
|
||||||
AggregatorFactories.Builder subFactories, Map<String, Object> metadata) throws IOException {
|
ScriptedMetricAggContexts.CombineScript.Factory combineScript,
|
||||||
|
Map<String, Object> combineScriptParams,
|
||||||
|
Script reduceScript,
|
||||||
|
Map<String, Object> aggParams,
|
||||||
|
SearchLookup lookup,
|
||||||
|
QueryShardContext queryShardContext,
|
||||||
|
AggregatorFactory parent,
|
||||||
|
AggregatorFactories.Builder subFactories,
|
||||||
|
Map<String, Object> metadata
|
||||||
|
) throws IOException {
|
||||||
super(name, queryShardContext, parent, subFactories, metadata);
|
super(name, queryShardContext, parent, subFactories, metadata);
|
||||||
this.mapScript = mapScript;
|
this.mapScript = mapScript;
|
||||||
this.mapScriptParams = mapScriptParams;
|
this.mapScriptParams = mapScriptParams;
|
||||||
@ -73,29 +83,19 @@ class ScriptedMetricAggregatorFactory extends AggregatorFactory {
|
|||||||
boolean collectsFromSingleBucket,
|
boolean collectsFromSingleBucket,
|
||||||
Map<String, Object> metadata) throws IOException {
|
Map<String, Object> metadata) throws IOException {
|
||||||
Map<String, Object> aggParams = this.aggParams == null ? org.elasticsearch.common.collect.Map.of() : this.aggParams;
|
Map<String, Object> aggParams = this.aggParams == null ? org.elasticsearch.common.collect.Map.of() : this.aggParams;
|
||||||
Map<String, Object> initialState = new HashMap<String, Object>();
|
|
||||||
|
|
||||||
ScriptedMetricAggContexts.InitScript initScript = this.initScript.newInstance(
|
|
||||||
mergeParams(aggParams, initScriptParams),
|
|
||||||
initialState
|
|
||||||
);
|
|
||||||
if (initScript != null) {
|
|
||||||
initScript.execute();
|
|
||||||
CollectionUtils.ensureNoSelfReferences(initialState, "Scripted metric aggs init script");
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, Object> mapParams = mergeParams(aggParams, mapScriptParams);
|
|
||||||
Map<String, Object> combineParams = mergeParams(aggParams, combineScriptParams);
|
|
||||||
Script reduceScript = deepCopyScript(this.reduceScript, searchContext, aggParams);
|
Script reduceScript = deepCopyScript(this.reduceScript, searchContext, aggParams);
|
||||||
|
|
||||||
return new ScriptedMetricAggregator(
|
return new ScriptedMetricAggregator(
|
||||||
name,
|
name,
|
||||||
lookup,
|
lookup,
|
||||||
initialState,
|
aggParams,
|
||||||
|
initScript,
|
||||||
|
initScriptParams,
|
||||||
mapScript,
|
mapScript,
|
||||||
mapParams,
|
mapScriptParams,
|
||||||
combineScript,
|
combineScript,
|
||||||
combineParams,
|
combineScriptParams,
|
||||||
reduceScript,
|
reduceScript,
|
||||||
searchContext,
|
searchContext,
|
||||||
parent,
|
parent,
|
||||||
@ -140,7 +140,7 @@ class ScriptedMetricAggregatorFactory extends AggregatorFactory {
|
|||||||
return clone;
|
return clone;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, Object> mergeParams(Map<String, Object> agg, Map<String, Object> script) {
|
static Map<String, Object> mergeParams(Map<String, Object> agg, Map<String, Object> script) {
|
||||||
// Start with script params
|
// Start with script params
|
||||||
Map<String, Object> combined = new HashMap<>(script);
|
Map<String, Object> combined = new HashMap<>(script);
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
package org.elasticsearch.search.aggregations.metrics;
|
package org.elasticsearch.search.aggregations.metrics;
|
||||||
|
|
||||||
|
import org.apache.lucene.document.Document;
|
||||||
import org.apache.lucene.document.SortedNumericDocValuesField;
|
import org.apache.lucene.document.SortedNumericDocValuesField;
|
||||||
import org.apache.lucene.document.SortedSetDocValuesField;
|
import org.apache.lucene.document.SortedSetDocValuesField;
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
@ -26,12 +27,17 @@ import org.apache.lucene.index.IndexReader;
|
|||||||
import org.apache.lucene.index.RandomIndexWriter;
|
import org.apache.lucene.index.RandomIndexWriter;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||||
|
import org.apache.lucene.search.Query;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.elasticsearch.common.CheckedConsumer;
|
import org.elasticsearch.common.CheckedConsumer;
|
||||||
|
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||||
|
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||||
|
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
|
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.query.QueryShardContext;
|
import org.elasticsearch.index.query.QueryShardContext;
|
||||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||||
@ -42,9 +48,13 @@ import org.elasticsearch.script.ScriptModule;
|
|||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.script.ScriptType;
|
import org.elasticsearch.script.ScriptType;
|
||||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||||
|
import org.elasticsearch.search.aggregations.Aggregator;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||||
|
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
|
||||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
||||||
|
import org.elasticsearch.search.internal.SearchContext;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -58,6 +68,8 @@ import java.util.function.Function;
|
|||||||
|
|
||||||
import static java.util.Collections.singleton;
|
import static java.util.Collections.singleton;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
|
public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
|
||||||
|
|
||||||
@ -95,6 +107,13 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
|
|||||||
private static final Script COMBINE_SCRIPT_SELF_REF = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptSelfRef",
|
private static final Script COMBINE_SCRIPT_SELF_REF = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptSelfRef",
|
||||||
Collections.emptyMap());
|
Collections.emptyMap());
|
||||||
|
|
||||||
|
private static final Script INIT_SCRIPT_MAKING_ARRAY = new Script(
|
||||||
|
ScriptType.INLINE,
|
||||||
|
MockScriptEngine.NAME,
|
||||||
|
"initScriptMakingArray",
|
||||||
|
Collections.emptyMap()
|
||||||
|
);
|
||||||
|
|
||||||
private static final Map<String, Function<Map<String, Object>, Object>> SCRIPTS = new HashMap<>();
|
private static final Map<String, Function<Map<String, Object>, Object>> SCRIPTS = new HashMap<>();
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
@ -181,6 +200,46 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
|
|||||||
state.put("selfRef", state);
|
state.put("selfRef", state);
|
||||||
return state;
|
return state;
|
||||||
});
|
});
|
||||||
|
SCRIPTS.put("initScriptMakingArray", params -> {
|
||||||
|
Map<String, Object> state = (Map<String, Object>) params.get("state");
|
||||||
|
state.put("array", new String[] {"foo", "bar"});
|
||||||
|
state.put("collector", new ArrayList<Integer>());
|
||||||
|
return state;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private CircuitBreakerService circuitBreakerService;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void mockBreaker() {
|
||||||
|
circuitBreakerService = mock(CircuitBreakerService.class);
|
||||||
|
when(circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(new NoopCircuitBreaker(CircuitBreaker.REQUEST) {
|
||||||
|
private long total = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
|
||||||
|
logger.debug("Used {} grabbing {} for {}", total, bytes, label);
|
||||||
|
total += bytes;
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long addWithoutBreaking(long bytes) {
|
||||||
|
logger.debug("Used {} grabbing {}", total, bytes);
|
||||||
|
total += bytes;
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsed() {
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void afterClose() {
|
||||||
|
assertThat(circuitBreakerService.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -420,8 +479,19 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testInitScriptMakesArray() throws IOException {
|
||||||
|
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
|
||||||
|
aggregationBuilder.initScript(INIT_SCRIPT_MAKING_ARRAY).mapScript(MAP_SCRIPT)
|
||||||
|
.combineScript(COMBINE_SCRIPT).reduceScript(REDUCE_SCRIPT);
|
||||||
|
testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
|
||||||
|
iw.addDocument(new Document());
|
||||||
|
}, (InternalScriptedMetric r) -> {
|
||||||
|
assertEquals(1, r.aggregation());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public void testAsSubAgg() throws IOException {
|
public void testAsSubAgg() throws IOException {
|
||||||
AggregationBuilder aggregationBuilder = new TermsAggregationBuilder("t").field("t")
|
AggregationBuilder aggregationBuilder = new TermsAggregationBuilder("t").field("t").executionHint("map")
|
||||||
.subAggregation(
|
.subAggregation(
|
||||||
new ScriptedMetricAggregationBuilder("scripted").initScript(INIT_SCRIPT)
|
new ScriptedMetricAggregationBuilder("scripted").initScript(INIT_SCRIPT)
|
||||||
.mapScript(MAP_SCRIPT)
|
.mapScript(MAP_SCRIPT)
|
||||||
@ -446,6 +516,25 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
|
|||||||
testCase(aggregationBuilder, new MatchAllDocsQuery(), buildIndex, verify, keywordField("t"), longField("number"));
|
testCase(aggregationBuilder, new MatchAllDocsQuery(), buildIndex, verify, keywordField("t"), longField("number"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected <A extends Aggregator> A createAggregator(
|
||||||
|
Query query,
|
||||||
|
AggregationBuilder aggregationBuilder,
|
||||||
|
IndexSearcher indexSearcher,
|
||||||
|
IndexSettings indexSettings,
|
||||||
|
MultiBucketConsumer bucketConsumer,
|
||||||
|
MappedFieldType... fieldTypes
|
||||||
|
) throws IOException {
|
||||||
|
SearchContext searchContext = createSearchContext(
|
||||||
|
indexSearcher,
|
||||||
|
indexSettings,
|
||||||
|
query,
|
||||||
|
bucketConsumer,
|
||||||
|
circuitBreakerService,
|
||||||
|
fieldTypes
|
||||||
|
);
|
||||||
|
return createAggregator(aggregationBuilder, searchContext);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We cannot use Mockito for mocking QueryShardContext in this case because
|
* We cannot use Mockito for mocking QueryShardContext in this case because
|
||||||
* script-related methods (e.g. QueryShardContext#getLazyExecutableScript)
|
* script-related methods (e.g. QueryShardContext#getLazyExecutableScript)
|
||||||
|
@ -237,9 +237,13 @@ public abstract class AggregatorTestCase extends ESTestCase {
|
|||||||
MultiBucketConsumer bucketConsumer,
|
MultiBucketConsumer bucketConsumer,
|
||||||
MappedFieldType... fieldTypes) throws IOException {
|
MappedFieldType... fieldTypes) throws IOException {
|
||||||
SearchContext searchContext = createSearchContext(indexSearcher, indexSettings, query, bucketConsumer, fieldTypes);
|
SearchContext searchContext = createSearchContext(indexSearcher, indexSettings, query, bucketConsumer, fieldTypes);
|
||||||
|
return createAggregator(aggregationBuilder, searchContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder, SearchContext searchContext)
|
||||||
|
throws IOException {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
A aggregator = (A) aggregationBuilder
|
A aggregator = (A) aggregationBuilder.rewrite(searchContext.getQueryShardContext())
|
||||||
.rewrite(searchContext.getQueryShardContext())
|
|
||||||
.build(searchContext.getQueryShardContext(), null)
|
.build(searchContext.getQueryShardContext(), null)
|
||||||
.create(searchContext, null, true);
|
.create(searchContext, null, true);
|
||||||
return aggregator;
|
return aggregator;
|
||||||
@ -876,6 +880,11 @@ public abstract class AggregatorTestCase extends ESTestCase {
|
|||||||
releasables.clear();
|
releasables.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hook for checking things after all {@link Aggregator}s have been closed.
|
||||||
|
*/
|
||||||
|
protected void afterClose() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make a {@linkplain DateFieldMapper.DateFieldType} for a {@code date}.
|
* Make a {@linkplain DateFieldMapper.DateFieldType} for a {@code date}.
|
||||||
*/
|
*/
|
||||||
|
Loading…
x
Reference in New Issue
Block a user