This commit introduces an optimization for inline scripts. It keeps the compiled ingest script that the ScriptProcessor.Factory has been creating for validation purposes. Previously, the Script Service's cache was leveraged because it was the best way to handle caching of both stored and inline scripts. Since inline scripts are so widely used in Ingest Node, it is probably best to ensure we are using the pre-compiled version from the beginning.
This commit is contained in:
parent
dc7ffb154a
commit
499ad6fcc4
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.ingest.common;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
|
@ -37,6 +38,7 @@ import org.elasticsearch.script.IngestScript;
|
|||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptException;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
|
@ -63,17 +65,19 @@ public final class ScriptProcessor extends AbstractProcessor {
|
|||
|
||||
private final Script script;
|
||||
private final ScriptService scriptService;
|
||||
private final IngestScript precompiledIngestScript;
|
||||
|
||||
/**
|
||||
* Processor that evaluates a script with an ingest document in its context
|
||||
*
|
||||
* @param tag The processor's tag.
|
||||
* @param script The {@link Script} to execute.
|
||||
* @param precompiledIngestScript The {@link Script} precompiled
|
||||
* @param scriptService The {@link ScriptService} used to execute the script.
|
||||
*/
|
||||
ScriptProcessor(String tag, Script script, ScriptService scriptService) {
|
||||
ScriptProcessor(String tag, Script script, @Nullable IngestScript precompiledIngestScript, ScriptService scriptService) {
|
||||
super(tag);
|
||||
this.script = script;
|
||||
this.precompiledIngestScript = precompiledIngestScript;
|
||||
this.scriptService = scriptService;
|
||||
}
|
||||
|
||||
|
@ -84,9 +88,14 @@ public final class ScriptProcessor extends AbstractProcessor {
|
|||
*/
|
||||
@Override
|
||||
public IngestDocument execute(IngestDocument document) {
|
||||
final IngestScript ingestScript;
|
||||
if (precompiledIngestScript == null) {
|
||||
IngestScript.Factory factory = scriptService.compile(script, IngestScript.CONTEXT);
|
||||
factory.newInstance(script.getParams()).execute(
|
||||
new DynamicMap(document.getSourceAndMetadata(), PARAMS_FUNCTIONS));
|
||||
ingestScript = factory.newInstance(script.getParams());
|
||||
} else {
|
||||
ingestScript = precompiledIngestScript;
|
||||
}
|
||||
ingestScript.execute(new DynamicMap(document.getSourceAndMetadata(), PARAMS_FUNCTIONS));
|
||||
CollectionUtils.ensureNoSelfReferences(document.getSourceAndMetadata(), "ingest script");
|
||||
return document;
|
||||
}
|
||||
|
@ -100,6 +109,10 @@ public final class ScriptProcessor extends AbstractProcessor {
|
|||
return script;
|
||||
}
|
||||
|
||||
IngestScript getPrecompiledIngestScript() {
|
||||
return precompiledIngestScript;
|
||||
}
|
||||
|
||||
public static final class Factory implements Processor.Factory {
|
||||
private final ScriptService scriptService;
|
||||
|
||||
|
@ -119,13 +132,16 @@ public final class ScriptProcessor extends AbstractProcessor {
|
|||
Arrays.asList("id", "source", "inline", "lang", "params", "options").forEach(config::remove);
|
||||
|
||||
// verify script is able to be compiled before successfully creating processor.
|
||||
IngestScript ingestScript = null;
|
||||
try {
|
||||
scriptService.compile(script, IngestScript.CONTEXT);
|
||||
final IngestScript.Factory factory = scriptService.compile(script, IngestScript.CONTEXT);
|
||||
if (ScriptType.INLINE.equals(script.getType())) {
|
||||
ingestScript = factory.newInstance(script.getParams());
|
||||
}
|
||||
} catch (ScriptException e) {
|
||||
throw newConfigurationException(TYPE, processorTag, null, e);
|
||||
}
|
||||
|
||||
return new ScriptProcessor(processorTag, script, scriptService);
|
||||
return new ScriptProcessor(processorTag, script, ingestScript, scriptService);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,10 +20,15 @@
|
|||
package org.elasticsearch.ingest.common;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParseException;
|
||||
import org.elasticsearch.script.IngestScript;
|
||||
import org.elasticsearch.script.MockScriptEngine;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptException;
|
||||
import org.elasticsearch.script.ScriptModule;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -55,6 +60,10 @@ public class ScriptProcessorFactoryTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFactoryValidationWithDefaultLang() throws Exception {
|
||||
ScriptService mockedScriptService = mock(ScriptService.class);
|
||||
when(mockedScriptService.compile(any(), any())).thenReturn(mock(IngestScript.Factory.class));
|
||||
factory = new ScriptProcessor.Factory(mockedScriptService);
|
||||
|
||||
Map<String, Object> configMap = new HashMap<>();
|
||||
String randomType = randomFrom("id", "source");
|
||||
configMap.put(randomType, "foo");
|
||||
|
@ -65,6 +74,10 @@ public class ScriptProcessorFactoryTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFactoryValidationWithParams() throws Exception {
|
||||
ScriptService mockedScriptService = mock(ScriptService.class);
|
||||
when(mockedScriptService.compile(any(), any())).thenReturn(mock(IngestScript.Factory.class));
|
||||
factory = new ScriptProcessor.Factory(mockedScriptService);
|
||||
|
||||
Map<String, Object> configMap = new HashMap<>();
|
||||
String randomType = randomFrom("id", "source");
|
||||
Map<String, Object> randomParams = Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||
|
@ -98,6 +111,10 @@ public class ScriptProcessorFactoryTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testInlineBackcompat() throws Exception {
|
||||
ScriptService mockedScriptService = mock(ScriptService.class);
|
||||
when(mockedScriptService.compile(any(), any())).thenReturn(mock(IngestScript.Factory.class));
|
||||
factory = new ScriptProcessor.Factory(mockedScriptService);
|
||||
|
||||
Map<String, Object> configMap = new HashMap<>();
|
||||
configMap.put("inline", "code");
|
||||
|
||||
|
@ -121,4 +138,44 @@ public class ScriptProcessorFactoryTests extends ESTestCase {
|
|||
|
||||
assertThat(exception.getMessage(), is("compile-time exception"));
|
||||
}
|
||||
|
||||
public void testInlineIsCompiled() throws Exception {
|
||||
String scriptName = "foo";
|
||||
ScriptService scriptService = new ScriptService(Settings.builder().build(),
|
||||
Collections.singletonMap(
|
||||
Script.DEFAULT_SCRIPT_LANG, new MockScriptEngine(
|
||||
Script.DEFAULT_SCRIPT_LANG,
|
||||
Collections.singletonMap(scriptName, ctx -> {
|
||||
ctx.put("foo", "bar");
|
||||
return null;
|
||||
}),
|
||||
Collections.emptyMap()
|
||||
)
|
||||
), new HashMap<>(ScriptModule.CORE_CONTEXTS));
|
||||
factory = new ScriptProcessor.Factory(scriptService);
|
||||
|
||||
Map<String, Object> configMap = new HashMap<>();
|
||||
configMap.put("source", scriptName);
|
||||
ScriptProcessor processor = factory.create(null, randomAlphaOfLength(10), configMap);
|
||||
assertThat(processor.getScript().getLang(), equalTo(Script.DEFAULT_SCRIPT_LANG));
|
||||
assertThat(processor.getScript().getType(), equalTo(ScriptType.INLINE));
|
||||
assertThat(processor.getScript().getParams(), equalTo(Collections.emptyMap()));
|
||||
assertNotNull(processor.getPrecompiledIngestScript());
|
||||
Map<String, Object> ctx = new HashMap<>();
|
||||
processor.getPrecompiledIngestScript().execute(ctx);
|
||||
assertThat(ctx.get("foo"), equalTo("bar"));
|
||||
}
|
||||
|
||||
public void testStoredIsNotCompiled() throws Exception {
|
||||
ScriptService mockedScriptService = mock(ScriptService.class);
|
||||
when(mockedScriptService.compile(any(), any())).thenReturn(mock(IngestScript.Factory.class));
|
||||
factory = new ScriptProcessor.Factory(mockedScriptService);
|
||||
Map<String, Object> configMap = new HashMap<>();
|
||||
configMap.put("id", "script_name");
|
||||
ScriptProcessor processor = factory.create(null, randomAlphaOfLength(10), configMap);
|
||||
assertNull(processor.getScript().getLang());
|
||||
assertThat(processor.getScript().getType(), equalTo(ScriptType.STORED));
|
||||
assertThat(processor.getScript().getParams(), equalTo(Collections.emptyMap()));
|
||||
assertNull(processor.getPrecompiledIngestScript());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,12 +22,14 @@ package org.elasticsearch.ingest.common;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.script.IngestScript;
|
||||
import org.elasticsearch.script.MockScriptEngine;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptModule;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -38,18 +40,22 @@ import static org.hamcrest.core.Is.is;
|
|||
|
||||
public class ScriptProcessorTests extends ESTestCase {
|
||||
|
||||
public void testScripting() throws Exception {
|
||||
int randomBytesIn = randomInt();
|
||||
int randomBytesOut = randomInt();
|
||||
int randomBytesTotal = randomBytesIn + randomBytesOut;
|
||||
private ScriptService scriptService;
|
||||
private Script script;
|
||||
private IngestScript ingestScript;
|
||||
|
||||
@Before
|
||||
public void setupScripting() {
|
||||
String scriptName = "script";
|
||||
ScriptService scriptService = new ScriptService(Settings.builder().build(),
|
||||
scriptService = new ScriptService(Settings.builder().build(),
|
||||
Collections.singletonMap(
|
||||
Script.DEFAULT_SCRIPT_LANG, new MockScriptEngine(
|
||||
Script.DEFAULT_SCRIPT_LANG,
|
||||
Collections.singletonMap(
|
||||
scriptName, ctx -> {
|
||||
ctx.put("bytes_total", randomBytesTotal);
|
||||
Integer bytesIn = (Integer) ctx.get("bytes_in");
|
||||
Integer bytesOut = (Integer) ctx.get("bytes_out");
|
||||
ctx.put("bytes_total", bytesIn + bytesOut);
|
||||
return null;
|
||||
}
|
||||
),
|
||||
|
@ -58,21 +64,37 @@ public class ScriptProcessorTests extends ESTestCase {
|
|||
),
|
||||
new HashMap<>(ScriptModule.CORE_CONTEXTS)
|
||||
);
|
||||
Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap());
|
||||
script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap());
|
||||
ingestScript = scriptService.compile(script, IngestScript.CONTEXT).newInstance(script.getParams());
|
||||
}
|
||||
|
||||
public void testScriptingWithoutPrecompiledScriptFactory() throws Exception {
|
||||
ScriptProcessor processor = new ScriptProcessor(randomAlphaOfLength(10), script, null, scriptService);
|
||||
IngestDocument ingestDocument = randomDocument();
|
||||
processor.execute(ingestDocument);
|
||||
assertIngestDocument(ingestDocument);
|
||||
}
|
||||
|
||||
public void testScriptingWithPrecompiledIngestScript() {
|
||||
ScriptProcessor processor = new ScriptProcessor(randomAlphaOfLength(10), script, ingestScript, scriptService);
|
||||
IngestDocument ingestDocument = randomDocument();
|
||||
processor.execute(ingestDocument);
|
||||
assertIngestDocument(ingestDocument);
|
||||
}
|
||||
|
||||
private IngestDocument randomDocument() {
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("bytes_in", randomInt());
|
||||
document.put("bytes_out", randomInt());
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
|
||||
ScriptProcessor processor = new ScriptProcessor(randomAlphaOfLength(10), script, scriptService);
|
||||
|
||||
processor.execute(ingestDocument);
|
||||
return RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
}
|
||||
|
||||
private void assertIngestDocument(IngestDocument ingestDocument) {
|
||||
assertThat(ingestDocument.getSourceAndMetadata(), hasKey("bytes_in"));
|
||||
assertThat(ingestDocument.getSourceAndMetadata(), hasKey("bytes_out"));
|
||||
assertThat(ingestDocument.getSourceAndMetadata(), hasKey("bytes_total"));
|
||||
assertThat(ingestDocument.getSourceAndMetadata().get("bytes_total"), is(randomBytesTotal));
|
||||
int bytesTotal = ingestDocument.getFieldValue("bytes_in", Integer.class) + ingestDocument.getFieldValue("bytes_out", Integer.class);
|
||||
assertThat(ingestDocument.getSourceAndMetadata().get("bytes_total"), is(bytesTotal));
|
||||
}
|
||||
|
||||
public void testTypeDeprecation() throws Exception {
|
||||
|
@ -94,7 +116,7 @@ public class ScriptProcessorTests extends ESTestCase {
|
|||
);
|
||||
Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap());
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
|
||||
ScriptProcessor processor = new ScriptProcessor(randomAlphaOfLength(10), script, scriptService);
|
||||
ScriptProcessor processor = new ScriptProcessor(randomAlphaOfLength(10), script, null, scriptService);
|
||||
processor.execute(ingestDocument);
|
||||
assertWarnings("[types removal] Looking up doc types [_type] in scripts is deprecated.");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue