Fix scripted metric BWC serialization (backport of #63821) (#63897)

We had and an error when serializing fully reduced scripted metrics.
Small typo and sever lack of tests..... Anyway, this fixed the one
character typo and adds a bunch more tests.
This commit is contained in:
Nik Everett 2020-10-20 13:15:26 -04:00 committed by GitHub
parent 6f977fc6f7
commit 8d30766a7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 97 additions and 39 deletions

View File

@ -42,12 +42,7 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
final Script reduceScript;
private final List<Object> aggregations;
InternalScriptedMetric(String name, Object aggregation, Script reduceScript, Map<String, Object> metadata) {
this(name, Collections.singletonList(aggregation), reduceScript, metadata);
}
private InternalScriptedMetric(String name, List<Object> aggregations, Script reduceScript, Map<String, Object> metadata) {
InternalScriptedMetric(String name, List<Object> aggregations, Script reduceScript, Map<String, Object> metadata) {
super(name, metadata);
this.aggregations = aggregations;
this.reduceScript = reduceScript;
@ -70,11 +65,12 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(reduceScript);
if (out.getVersion().before(Version.V_7_8_0)) {
if (aggregations.size() > 0) {
if (aggregations.size() > 1) {
/*
* I *believe* that this situation can only happen in cross
* cluster search right now. Thus the message. But computers
* are hard.
* If aggregations has more than one entry we're trying to
* serialize an unreduced aggregation. This *should* only
* happen when we're returning a scripted_metric over cross
* cluster search.
*/
throw new IllegalArgumentException("scripted_metric doesn't support cross cluster search until 7.8.0");
}
@ -97,7 +93,7 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
return aggregations.get(0);
}
List<Object> getAggregation() {
List<Object> aggregationsList() {
return aggregations;
}

View File

@ -51,7 +51,7 @@ public class MetricInspectionHelper {
public static boolean hasValue(InternalScriptedMetric agg) {
// TODO better way to know if the scripted metric received documents?
// Could check for null too, but a script might return null on purpose...
return agg.getAggregation().size() > 0 ;
return agg.aggregationsList().size() > 0 ;
}
public static boolean hasValue(InternalTDigestPercentileRanks agg) {

View File

@ -41,6 +41,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.singletonList;
class ScriptedMetricAggregator extends MetricsAggregator {
/**
* Estimated cost to maintain a bucket. Since this aggregator uses
@ -139,7 +141,7 @@ class ScriptedMetricAggregator extends MetricsAggregator {
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
Object result = aggStateForResult(owningBucketOrdinal).combine();
StreamOutput.checkWriteable(result);
return new InternalScriptedMetric(name, result, reduceScript, metadata());
return new InternalScriptedMetric(name, singletonList(result), reduceScript, metadata());
}
private State aggStateForResult(long owningBucketOrdinal) {
@ -157,7 +159,7 @@ class ScriptedMetricAggregator extends MetricsAggregator {
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalScriptedMetric(name, null, reduceScript, metadata());
return new InternalScriptedMetric(name, singletonList(null), reduceScript, metadata());
}
@Override

View File

@ -301,7 +301,7 @@ public class AggregationsTests extends ESTestCase {
singleBucketAggTestCase.subAggregationsSupplier = () -> InternalAggregations.EMPTY;
}
}
aggs.add(testCase.createTestInstance());
aggs.add(testCase.createTestInstanceForXContent());
}
return InternalAggregations.from(aggs);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.Version;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.MockScriptEngine;
@ -28,8 +29,12 @@ import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.Aggregation.CommonFields;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.util.ArrayList;
@ -41,6 +46,9 @@ import java.util.Map;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
public class InternalScriptedMetricTests extends InternalAggregationTestCase<InternalScriptedMetric> {
private static final String REDUCE_SCRIPT_NAME = "reduceScript";
@ -79,8 +87,27 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
if (hasReduceScript) {
reduceScript = new Script(ScriptType.INLINE, MockScriptEngine.NAME, REDUCE_SCRIPT_NAME, params);
}
Object randomValue = randomValue(valueTypes, 0);
return new InternalScriptedMetric(name, randomValue, reduceScript, metadata);
return new InternalScriptedMetric(name, randomAggregations(), reduceScript, metadata);
}
private List<Object> randomAggregations() {
return randomList(randomBoolean() ? 1 : 5, this::randomAggregation);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private Object randomAggregation() {
int levels = randomIntBetween(1, 3);
Supplier[] valueTypes = new Supplier[levels];
for (int l = 0; l < levels; l++) {
if (l < levels - 1) {
valueTypes[l] = randomFrom(nestedValueSuppliers);
} else {
// the last one needs to be a leaf value, not map or
// list
valueTypes[l] = randomFrom(leafValueSuppliers);
}
}
return randomValue(valueTypes, 0);
}
@SuppressWarnings("unchecked")
@ -124,13 +151,23 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
InternalScriptedMetric firstAgg = inputs.get(0);
assertEquals(firstAgg.getName(), reduced.getName());
assertEquals(firstAgg.getMetadata(), reduced.getMetadata());
int size = (int) inputs.stream().mapToLong(i -> i.aggregationsList().size()).sum();
if (hasReduceScript) {
assertEquals(inputs.size(), reduced.aggregation());
assertEquals(size, reduced.aggregation());
} else {
assertEquals(inputs.size(), ((List<?>) reduced.aggregation()).size());
assertEquals(size, ((List<?>) reduced.aggregation()).size());
}
}
@Override
public InternalScriptedMetric createTestInstanceForXContent() {
InternalScriptedMetric aggregation = createTestInstance();
return (InternalScriptedMetric) aggregation.reduce(
singletonList(aggregation),
ReduceContext.forFinalReduction(null, mockScriptService(), null, PipelineTree.EMPTY)
);
}
@Override
protected void assertFromXContent(InternalScriptedMetric aggregation, ParsedAggregation parsedAggregation) {
assertTrue(parsedAggregation instanceof ParsedScriptedMetric);
@ -188,7 +225,7 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
@Override
protected InternalScriptedMetric mutateInstance(InternalScriptedMetric instance) throws IOException {
String name = instance.getName();
Object value = instance.aggregation();
List<Object> aggregationsList = instance.aggregationsList();
Script reduceScript = instance.reduceScript;
Map<String, Object> metadata = instance.getMetadata();
switch (between(0, 3)) {
@ -196,22 +233,7 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
name += randomAlphaOfLength(5);
break;
case 1:
Object newValue = randomValue(valueTypes, 0);
while ((newValue == null && value == null) || (newValue != null && newValue.equals(value))) {
int levels = randomIntBetween(1, 3);
Supplier[] valueTypes = new Supplier[levels];
for (int i = 0; i < levels; i++) {
if (i < levels - 1) {
valueTypes[i] = randomFrom(nestedValueSuppliers);
} else {
// the last one needs to be a leaf value, not map or
// list
valueTypes[i] = randomFrom(leafValueSuppliers);
}
}
newValue = randomValue(valueTypes, 0);
}
value = newValue;
aggregationsList = randomValueOtherThan(aggregationsList, this::randomAggregations);
break;
case 2:
reduceScript = new Script(ScriptType.INLINE, MockScriptEngine.NAME, REDUCE_SCRIPT_NAME + "-mutated", Collections.emptyMap());
@ -227,6 +249,40 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
default:
throw new AssertionError("Illegal randomisation branch");
}
return new InternalScriptedMetric(name, value, reduceScript, metadata);
return new InternalScriptedMetric(name, aggregationsList, reduceScript, metadata);
}
public void testOldSerialization() throws IOException {
// A single element list looks like a fully reduced agg
InternalScriptedMetric original = new InternalScriptedMetric(
"test",
org.elasticsearch.common.collect.List.of("foo"),
new Script("test"),
null
);
original.mergePipelineTreeForBWCSerialization(PipelineTree.EMPTY);
InternalScriptedMetric roundTripped = (InternalScriptedMetric) copyNamedWriteable(
original,
getNamedWriteableRegistry(),
InternalAggregation.class,
VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, VersionUtils.getPreviousVersion(Version.V_7_8_0))
);
assertThat(roundTripped, equalTo(original));
// A multi-element list looks like a non-reduced agg
InternalScriptedMetric unreduced = new InternalScriptedMetric(
"test",
org.elasticsearch.common.collect.List.of("foo", "bar"),
new Script("test"),
null
);
unreduced.mergePipelineTreeForBWCSerialization(PipelineTree.EMPTY);
Exception e = expectThrows(IllegalArgumentException.class, () -> copyNamedWriteable(
unreduced,
getNamedWriteableRegistry(),
InternalAggregation.class,
VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, VersionUtils.getPreviousVersion(Version.V_7_8_0))
));
assertThat(e.getMessage(), equalTo("scripted_metric doesn't support cross cluster search until 7.8.0"));
}
}

View File

@ -441,14 +441,18 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
return createUnmappedInstance(name, metadata);
}
public T createTestInstanceForXContent() {
return createTestInstance();
}
public final void testFromXContent() throws IOException {
final T aggregation = createTestInstance();
final T aggregation = createTestInstanceForXContent();
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false);
assertFromXContent(aggregation, parsedAggregation);
}
public final void testFromXContentWithRandomFields() throws IOException {
final T aggregation = createTestInstance();
final T aggregation = createTestInstanceForXContent();
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true);
assertFromXContent(aggregation, parsedAggregation);
}