Eagerly compile condition script at processor creation (#58882)
Ingest script processors were changed to eagerly compile their scripts when the ingest pipeline is saved, but conditional scripts were missed. This commit adds eager compilation to ingest conditional scripts, which will help surface errors before runtime, as well as adds tests for each case we might encounter between inline and stored script compilation failures. closes #58864
This commit is contained in:
parent
2c04685b81
commit
d825d4352c
|
@ -24,7 +24,9 @@ import org.elasticsearch.common.logging.DeprecationLogger;
|
||||||
import org.elasticsearch.script.DynamicMap;
|
import org.elasticsearch.script.DynamicMap;
|
||||||
import org.elasticsearch.script.IngestConditionalScript;
|
import org.elasticsearch.script.IngestConditionalScript;
|
||||||
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.Script;
|
||||||
|
import org.elasticsearch.script.ScriptException;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
|
import org.elasticsearch.script.ScriptType;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -41,6 +43,8 @@ import java.util.function.Function;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
|
||||||
|
|
||||||
public class ConditionalProcessor extends AbstractProcessor implements WrappingProcessor {
|
public class ConditionalProcessor extends AbstractProcessor implements WrappingProcessor {
|
||||||
|
|
||||||
private static final DeprecationLogger deprecationLogger =
|
private static final DeprecationLogger deprecationLogger =
|
||||||
|
@ -55,12 +59,11 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
|
||||||
static final String TYPE = "conditional";
|
static final String TYPE = "conditional";
|
||||||
|
|
||||||
private final Script condition;
|
private final Script condition;
|
||||||
|
|
||||||
private final ScriptService scriptService;
|
private final ScriptService scriptService;
|
||||||
|
|
||||||
private final Processor processor;
|
private final Processor processor;
|
||||||
private final IngestMetric metric;
|
private final IngestMetric metric;
|
||||||
private final LongSupplier relativeTimeProvider;
|
private final LongSupplier relativeTimeProvider;
|
||||||
|
private final IngestConditionalScript precompiledConditionScript;
|
||||||
|
|
||||||
ConditionalProcessor(String tag, String description, Script script, ScriptService scriptService, Processor processor) {
|
ConditionalProcessor(String tag, String description, Script script, ScriptService scriptService, Processor processor) {
|
||||||
this(tag, description, script, scriptService, processor, System::nanoTime);
|
this(tag, description, script, scriptService, processor, System::nanoTime);
|
||||||
|
@ -74,6 +77,18 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
|
||||||
this.processor = processor;
|
this.processor = processor;
|
||||||
this.metric = new IngestMetric();
|
this.metric = new IngestMetric();
|
||||||
this.relativeTimeProvider = relativeTimeProvider;
|
this.relativeTimeProvider = relativeTimeProvider;
|
||||||
|
|
||||||
|
try {
|
||||||
|
final IngestConditionalScript.Factory factory = scriptService.compile(script, IngestConditionalScript.CONTEXT);
|
||||||
|
if (ScriptType.INLINE.equals(script.getType())) {
|
||||||
|
precompiledConditionScript = factory.newInstance(script.getParams());
|
||||||
|
} else {
|
||||||
|
// stored script, so will have to compile at runtime
|
||||||
|
precompiledConditionScript = null;
|
||||||
|
}
|
||||||
|
} catch (ScriptException e) {
|
||||||
|
throw newConfigurationException(TYPE, tag, null, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -110,8 +125,11 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean evaluate(IngestDocument ingestDocument) {
|
boolean evaluate(IngestDocument ingestDocument) {
|
||||||
IngestConditionalScript script =
|
IngestConditionalScript script = precompiledConditionScript;
|
||||||
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
|
if (script == null) {
|
||||||
|
IngestConditionalScript.Factory factory = scriptService.compile(condition, IngestConditionalScript.CONTEXT);
|
||||||
|
script = factory.newInstance(condition.getParams());
|
||||||
|
}
|
||||||
return script.execute(new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS)));
|
return script.execute(new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -415,7 +415,7 @@ public class ScriptService implements Closeable, ClusterStateApplier {
|
||||||
return scriptMetadata.getStoredScripts();
|
return scriptMetadata.getStoredScripts();
|
||||||
}
|
}
|
||||||
|
|
||||||
StoredScriptSource getScriptFromClusterState(String id) {
|
protected StoredScriptSource getScriptFromClusterState(String id) {
|
||||||
ScriptMetadata scriptMetadata = clusterState.metadata().custom(ScriptMetadata.TYPE);
|
ScriptMetadata scriptMetadata = clusterState.metadata().custom(ScriptMetadata.TYPE);
|
||||||
|
|
||||||
if (scriptMetadata == null) {
|
if (scriptMetadata == null) {
|
||||||
|
|
|
@ -20,13 +20,18 @@
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.script.IngestConditionalScript;
|
||||||
import org.elasticsearch.script.MockScriptEngine;
|
import org.elasticsearch.script.MockScriptEngine;
|
||||||
|
import org.elasticsearch.script.MockScriptService;
|
||||||
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.Script;
|
||||||
|
import org.elasticsearch.script.ScriptException;
|
||||||
import org.elasticsearch.script.ScriptModule;
|
import org.elasticsearch.script.ScriptModule;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.script.ScriptType;
|
import org.elasticsearch.script.ScriptType;
|
||||||
|
import org.elasticsearch.script.StoredScriptSource;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -34,6 +39,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
|
|
||||||
|
@ -195,6 +201,63 @@ public class ConditionalProcessorTests extends ESTestCase {
|
||||||
assertWarnings("[types removal] Looking up doc types [_type] in scripts is deprecated.");
|
assertWarnings("[types removal] Looking up doc types [_type] in scripts is deprecated.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPrecompiledError() {
|
||||||
|
ScriptService scriptService = MockScriptService.singleContext(IngestConditionalScript.CONTEXT, code -> {
|
||||||
|
throw new ScriptException("bad script", new ParseException("error", 0),
|
||||||
|
org.elasticsearch.common.collect.List.of(), "", "lang", null);
|
||||||
|
}, org.elasticsearch.common.collect.Map.of());
|
||||||
|
Script script = new Script(ScriptType.INLINE, "lang", "foo", org.elasticsearch.common.collect.Map.of());
|
||||||
|
ScriptException e = expectThrows(ScriptException.class, () ->
|
||||||
|
new ConditionalProcessor(null, null, script, scriptService, null));
|
||||||
|
assertThat(e.getMessage(), equalTo("bad script"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRuntimeCompileError() {
|
||||||
|
AtomicBoolean fail = new AtomicBoolean(false);
|
||||||
|
Map<String, StoredScriptSource> storedScripts = new HashMap<>();
|
||||||
|
storedScripts.put("foo", new StoredScriptSource("lang", "", org.elasticsearch.common.collect.Map.of()));
|
||||||
|
ScriptService scriptService = MockScriptService.singleContext(IngestConditionalScript.CONTEXT, code -> {
|
||||||
|
if (fail.get()) {
|
||||||
|
throw new ScriptException("bad script", new ParseException("error", 0),
|
||||||
|
org.elasticsearch.common.collect.List.of(), "", "lang", null);
|
||||||
|
} else {
|
||||||
|
return params -> new IngestConditionalScript(params) {
|
||||||
|
@Override
|
||||||
|
public boolean execute(Map<String, Object> ctx) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}, storedScripts);
|
||||||
|
Script script = new Script(ScriptType.STORED, null, "foo", org.elasticsearch.common.collect.Map.of());
|
||||||
|
ConditionalProcessor processor = new ConditionalProcessor(null, null, script, scriptService, null);
|
||||||
|
fail.set(true);
|
||||||
|
// must change the script source or the cached version will be used
|
||||||
|
storedScripts.put("foo", new StoredScriptSource("lang", "changed", org.elasticsearch.common.collect.Map.of()));
|
||||||
|
IngestDocument ingestDoc = new IngestDocument(org.elasticsearch.common.collect.Map.of(),
|
||||||
|
org.elasticsearch.common.collect.Map.of());
|
||||||
|
processor.execute(ingestDoc, (doc, e) -> {
|
||||||
|
assertThat(e.getMessage(), equalTo("bad script"));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRuntimeError() {
|
||||||
|
ScriptService scriptService = MockScriptService.singleContext(IngestConditionalScript.CONTEXT,
|
||||||
|
code -> params -> new IngestConditionalScript(params) {
|
||||||
|
@Override
|
||||||
|
public boolean execute(Map<String, Object> ctx) {
|
||||||
|
throw new IllegalArgumentException("runtime problem");
|
||||||
|
}
|
||||||
|
}, org.elasticsearch.common.collect.Map.of());
|
||||||
|
Script script = new Script(ScriptType.INLINE, "lang", "foo", org.elasticsearch.common.collect.Map.of());
|
||||||
|
ConditionalProcessor processor = new ConditionalProcessor(null, null, script, scriptService, null);
|
||||||
|
IngestDocument ingestDoc = new IngestDocument(org.elasticsearch.common.collect.Map.of(),
|
||||||
|
org.elasticsearch.common.collect.Map.of());
|
||||||
|
processor.execute(ingestDoc, (doc, e) -> {
|
||||||
|
assertThat(e.getMessage(), equalTo("runtime problem"));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private static void assertMutatingCtxThrows(Consumer<Map<String, Object>> mutation) throws Exception {
|
private static void assertMutatingCtxThrows(Consumer<Map<String, Object>> mutation) throws Exception {
|
||||||
String scriptName = "conditionalScript";
|
String scriptName = "conditionalScript";
|
||||||
CompletableFuture<Exception> expectedException = new CompletableFuture<>();
|
CompletableFuture<Exception> expectedException = new CompletableFuture<>();
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class ScriptServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
StoredScriptSource getScriptFromClusterState(String id) {
|
protected StoredScriptSource getScriptFromClusterState(String id) {
|
||||||
//mock the script that gets retrieved from an index
|
//mock the script that gets retrieved from an index
|
||||||
return new StoredScriptSource("test", "1+1", Collections.emptyMap());
|
return new StoredScriptSource("test", "1+1", Collections.emptyMap());
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.node.MockNode;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class MockScriptService extends ScriptService {
|
public class MockScriptService extends ScriptService {
|
||||||
/**
|
/**
|
||||||
|
@ -39,4 +41,32 @@ public class MockScriptService extends ScriptService {
|
||||||
boolean compilationLimitsEnabled() {
|
boolean compilationLimitsEnabled() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T> MockScriptService singleContext(ScriptContext<T> context, Function<String, T> compile,
|
||||||
|
Map<String, StoredScriptSource> storedLookup) {
|
||||||
|
ScriptEngine engine = new ScriptEngine() {
|
||||||
|
@Override
|
||||||
|
public String getType() {
|
||||||
|
return "lang";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <FactoryType> FactoryType compile(String name, String code, ScriptContext<FactoryType> context,
|
||||||
|
Map<String, String> params) {
|
||||||
|
return context.factoryClazz.cast(compile.apply(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<ScriptContext<?>> getSupportedContexts() {
|
||||||
|
return org.elasticsearch.common.collect.Set.of(context);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return new MockScriptService(Settings.EMPTY, org.elasticsearch.common.collect.Map.of("lang", engine),
|
||||||
|
org.elasticsearch.common.collect.Map.of(context.name, context)) {
|
||||||
|
@Override
|
||||||
|
protected StoredScriptSource getScriptFromClusterState(String id) {
|
||||||
|
return storedLookup.get(id);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue