Require combine and reduce scripts in scripted metrics aggregation (#33452)

* Make text message not required in constructor for slack

* Remove unnecessary comments in test file

* Throw exception when reduce or combine is not provided; update tests

* Update integration tests for scripted metrics to always include reduce and combine

* Remove some old changes from previous branches

* Rearrange script presence checks to be earlier in build

* Change null check order in script builder for aggregated metrics; correct test scripts in IT

* Add breaking change details to PR
This commit is contained in:
albendz 2018-10-03 07:22:01 -07:00 committed by Colin Goodheart-Smithe
parent 9d36cbaf16
commit f09190c14d
4 changed files with 129 additions and 36 deletions

View File

@ -26,3 +26,9 @@ has been removed. `missing_bucket` should be used instead.
The object used to share aggregation state between the scripts in a Scripted Metric
Aggregation is now a variable called `state` available in the script context, rather than
being provided via the `params` object as `params._agg`.
[float]
==== Make metric aggregation script parameters `reduce_script` and `combine_script` mandatory
The metric aggregation has been changed to require these two script parameters to ensure users are
explicitly defining how their data is processed.

View File

@ -196,6 +196,14 @@ public class ScriptedMetricAggregationBuilder extends AbstractAggregationBuilder
protected ScriptedMetricAggregatorFactory doBuild(SearchContext context, AggregatorFactory<?> parent,
Builder subfactoriesBuilder) throws IOException {
if (combineScript == null) {
throw new IllegalArgumentException("[combineScript] must not be null: [" + name + "]");
}
if(reduceScript == null) {
throw new IllegalArgumentException("[reduceScript] must not be null: [" + name + "]");
}
QueryShardContext queryShardContext = context.getQueryShardContext();
// Extract params from scripts and pass them along to ScriptedMetricAggregatorFactory, since it won't have
@ -215,16 +223,14 @@ public class ScriptedMetricAggregationBuilder extends AbstractAggregationBuilder
ScriptedMetricAggContexts.MapScript.CONTEXT);
Map<String, Object> mapScriptParams = mapScript.getParams();
ScriptedMetricAggContexts.CombineScript.Factory compiledCombineScript;
Map<String, Object> combineScriptParams;
if (combineScript != null) {
compiledCombineScript = queryShardContext.getScriptService().compile(combineScript,
ScriptedMetricAggContexts.CombineScript.CONTEXT);
combineScriptParams = combineScript.getParams();
} else {
compiledCombineScript = (p, a) -> null;
combineScriptParams = Collections.emptyMap();
}
compiledCombineScript = queryShardContext.getScriptService().compile(combineScript,
ScriptedMetricAggContexts.CombineScript.CONTEXT);
combineScriptParams = combineScript.getParams();
return new ScriptedMetricAggregatorFactory(name, compiledMapScript, mapScriptParams, compiledInitScript,
initScriptParams, compiledCombineScript, combineScriptParams, reduceScript,
params, queryShardContext.lookup(), context, parent, subfactoriesBuilder, metaData);

View File

@ -54,6 +54,8 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
private static final Script MAP_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "mapScript", Collections.emptyMap());
private static final Script COMBINE_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScript",
Collections.emptyMap());
private static final Script REDUCE_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "reduceScript",
Collections.emptyMap());
private static final Script INIT_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "initScriptScore",
Collections.emptyMap());
@ -61,6 +63,8 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
Collections.emptyMap());
private static final Script COMBINE_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptScore",
Collections.emptyMap());
private static final Script COMBINE_SCRIPT_NOOP = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptNoop",
Collections.emptyMap());
private static final Script INIT_SCRIPT_PARAMS = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "initScriptParams",
Collections.singletonMap("initialValue", 24));
@ -96,6 +100,14 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return ((List<Integer>) state.get("collector")).stream().mapToInt(Integer::intValue).sum();
});
SCRIPTS.put("combineScriptNoop", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return state;
});
SCRIPTS.put("reduceScript", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return state;
});
SCRIPTS.put("initScriptScore", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
@ -160,7 +172,7 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.mapScript(MAP_SCRIPT); // map script is mandatory, even if its not used in this case
aggregationBuilder.mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT_NOOP).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
@ -169,9 +181,6 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
}
}
/**
* without combine script, the "states" map should contain a list of the size of the number of documents matched
*/
public void testScriptedMetricWithoutCombine() throws IOException {
try (Directory directory = newDirectory()) {
int numDocs = randomInt(100);
@ -182,15 +191,28 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
@SuppressWarnings("unchecked")
Map<String, Object> agg = (Map<String, Object>) scriptedMetric.aggregation();
@SuppressWarnings("unchecked")
List<Integer> list = (List<Integer>) agg.get("collector");
assertEquals(numDocs, list.size());
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).reduceScript(REDUCE_SCRIPT);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder));
assertEquals(exception.getMessage(), "[combineScript] must not be null: [scriptedMetric]");
}
}
}
public void testScriptedMetricWithoutReduce() throws IOException {
try (Directory directory = newDirectory()) {
int numDocs = randomInt(100);
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
for (int i = 0; i < numDocs; i++) {
indexWriter.addDocument(singleton(new SortedNumericDocValuesField("number", i)));
}
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder));
assertEquals(exception.getMessage(), "[reduceScript] must not be null: [scriptedMetric]");
}
}
}
@ -208,7 +230,8 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
@ -230,7 +253,8 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_SCORE).mapScript(MAP_SCRIPT_SCORE).combineScript(COMBINE_SCRIPT_SCORE);
aggregationBuilder.initScript(INIT_SCRIPT_SCORE).mapScript(MAP_SCRIPT_SCORE)
.combineScript(COMBINE_SCRIPT_SCORE).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
@ -250,7 +274,8 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS).combineScript(COMBINE_SCRIPT_PARAMS);
aggregationBuilder.initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
// The result value depends on the script params.
@ -270,8 +295,8 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
Map<String, Object> aggParams = Collections.singletonMap(CONFLICTING_PARAM_NAME, "blah");
aggregationBuilder.params(aggParams).initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS).
combineScript(COMBINE_SCRIPT_PARAMS);
aggregationBuilder.params(aggParams).initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
@ -289,7 +314,8 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_SELF_REF).mapScript(MAP_SCRIPT);
aggregationBuilder.initScript(INIT_SCRIPT_SELF_REF).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
@ -309,7 +335,8 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT_SELF_REF);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT_SELF_REF)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
@ -326,7 +353,8 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT_SELF_REF);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT_SELF_REF).reduceScript(REDUCE_SCRIPT);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)

View File

@ -153,6 +153,14 @@ public class ScriptedMetricIT extends ESIntegTestCase {
return newAggregation;
});
scripts.put("no-op aggregation", vars -> {
return (Map<String, Object>) vars.get("state");
});
scripts.put("no-op list aggregation", vars -> {
return (List<List<?>>) vars.get("states");
});
// Equivalent to:
//
// newaggregation = [];
@ -188,6 +196,11 @@ public class ScriptedMetricIT extends ESIntegTestCase {
Integer sum = 0;
List<Map<String, Object>> states = (List<Map<String, Object>>) vars.get("states");
if(states == null) {
return newAggregation;
}
for (Map<String, Object> state : states) {
List<?> list = (List<?>) state.get("list");
if (list != null) {
@ -328,10 +341,14 @@ public class ScriptedMetricIT extends ESIntegTestCase {
public void testMap() {
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state['count'] = 1", Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "no-op aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"no-op list aggregation", Collections.emptyMap());
SearchResponse response = client().prepareSearch("idx")
.setQuery(matchAllQuery())
.addAggregation(scriptedMetric("scripted").mapScript(mapScript))
.addAggregation(scriptedMetric("scripted").mapScript(mapScript).combineScript(combineScript).reduceScript(reduceScript))
.get();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
@ -369,10 +386,18 @@ public class ScriptedMetricIT extends ESIntegTestCase {
Map<String, Object> aggregationParams = Collections.singletonMap("param2", 1);
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state[param1] = param2", scriptParams);
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "no-op aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"no-op list aggregation", Collections.emptyMap());
SearchResponse response = client().prepareSearch("idx")
.setQuery(matchAllQuery())
.addAggregation(scriptedMetric("scripted").params(aggregationParams).mapScript(mapScript))
.addAggregation(scriptedMetric("scripted")
.params(aggregationParams)
.mapScript(mapScript)
.combineScript(combineScript)
.reduceScript(reduceScript))
.get();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
@ -423,7 +448,11 @@ public class ScriptedMetricIT extends ESIntegTestCase {
.initScript(
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap()))
.mapScript(new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"state.list.add(vars.multiplier)", Collections.emptyMap())))
"state.list.add(vars.multiplier)", Collections.emptyMap()))
.combineScript(new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"no-op aggregation", Collections.emptyMap()))
.reduceScript(new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"no-op list aggregation", Collections.emptyMap())))
.get();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
@ -466,6 +495,8 @@ public class ScriptedMetricIT extends ESIntegTestCase {
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(1)", Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"no-op list aggregation", Collections.emptyMap());
SearchResponse response = client()
.prepareSearch("idx")
@ -474,7 +505,8 @@ public class ScriptedMetricIT extends ESIntegTestCase {
scriptedMetric("scripted")
.params(params)
.mapScript(mapScript)
.combineScript(combineScript))
.combineScript(combineScript)
.reduceScript(reduceScript))
.execute().actionGet();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
@ -519,6 +551,8 @@ public class ScriptedMetricIT extends ESIntegTestCase {
Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"no-op list aggregation", Collections.emptyMap());
SearchResponse response = client()
.prepareSearch("idx")
@ -528,7 +562,8 @@ public class ScriptedMetricIT extends ESIntegTestCase {
.params(params)
.initScript(initScript)
.mapScript(mapScript)
.combineScript(combineScript))
.combineScript(combineScript)
.reduceScript(reduceScript))
.get();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
@ -713,6 +748,8 @@ public class ScriptedMetricIT extends ESIntegTestCase {
Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap());
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "no-op aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum all states' state.list values as a new aggregation", Collections.emptyMap());
@ -724,6 +761,7 @@ public class ScriptedMetricIT extends ESIntegTestCase {
.params(params)
.initScript(initScript)
.mapScript(mapScript)
.combineScript(combineScript)
.reduceScript(reduceScript))
.get();
assertSearchResponse(response);
@ -752,6 +790,8 @@ public class ScriptedMetricIT extends ESIntegTestCase {
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "no-op aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum all states' state.list values as a new aggregation", Collections.emptyMap());
@ -762,6 +802,7 @@ public class ScriptedMetricIT extends ESIntegTestCase {
scriptedMetric("scripted")
.params(params)
.mapScript(mapScript)
.combineScript(combineScript)
.reduceScript(reduceScript))
.get();
assertSearchResponse(response);
@ -980,6 +1021,11 @@ public class ScriptedMetricIT extends ESIntegTestCase {
*/
public void testDontCacheScripts() throws Exception {
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state['count'] = 1", Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "no-op aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"no-op list aggregation", Collections.emptyMap());
assertAcked(prepareCreate("cache_test_idx").addMapping("type", "d", "type=long")
.setSettings(Settings.builder().put("requests.cache.enable", true).put("number_of_shards", 1).put("number_of_replicas", 1))
.get());
@ -994,7 +1040,7 @@ public class ScriptedMetricIT extends ESIntegTestCase {
// Test that a request using a script does not get cached
SearchResponse r = client().prepareSearch("cache_test_idx").setSize(0)
.addAggregation(scriptedMetric("foo").mapScript(mapScript)).get();
.addAggregation(scriptedMetric("foo").mapScript(mapScript).combineScript(combineScript).reduceScript(reduceScript)).get();
assertSearchResponse(r);
assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache()
@ -1006,10 +1052,17 @@ public class ScriptedMetricIT extends ESIntegTestCase {
public void testConflictingAggAndScriptParams() {
Map<String, Object> params = Collections.singletonMap("param1", "12");
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(1)", params);
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "no-op aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"no-op list aggregation", Collections.emptyMap());
SearchRequestBuilder builder = client().prepareSearch("idx")
.setQuery(matchAllQuery())
.addAggregation(scriptedMetric("scripted").params(params).mapScript(mapScript));
.addAggregation(scriptedMetric("scripted")
.params(params).mapScript(mapScript)
.combineScript(combineScript)
.reduceScript(reduceScript));
SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, builder::get);
assertThat(ex.getCause().getMessage(), containsString("Parameter name \"param1\" used in both aggregation and script parameters"));