From cc4d7059bfda42042e4e320c7c2bb228c0fe4691 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 30 Aug 2018 03:46:39 +0200 Subject: [PATCH] Ingest: Add conditional per processor (#32398) * Ingest: Add conditional per processor * closes #21248 --- .../ingest/common/ForEachProcessor.java | 11 +- .../ingest/common/IngestCommonPlugin.java | 2 +- .../common/ForEachProcessorFactoryTests.java | 16 +- .../test/ingest/210_conditional_processor.yml | 81 ++++ .../ingest/SimulatePipelineRequest.java | 6 +- .../ingest/ConditionalProcessor.java | 381 ++++++++++++++++++ .../ingest/ConfigurationUtils.java | 55 ++- .../elasticsearch/ingest/IngestService.java | 16 +- .../org/elasticsearch/ingest/Pipeline.java | 8 +- .../script/IngestConditionalScript.java | 51 +++ .../elasticsearch/script/ScriptModule.java | 1 + .../ingest/ConditionalProcessorTests.java | 141 +++++++ .../ingest/ConfigurationUtilsTests.java | 19 +- .../ingest/PipelineFactoryTests.java | 30 +- .../script/MockScriptEngine.java | 8 + 15 files changed, 788 insertions(+), 38 deletions(-) create mode 100644 modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_conditional_processor.yml create mode 100644 server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java create mode 100644 server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java create mode 100644 server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index f5bf9cc9591..31c0ae8cc3d 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import org.elasticsearch.script.ScriptService; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -96,6 +97,13 @@ public final class ForEachProcessor extends AbstractProcessor { } public static final class Factory implements Processor.Factory { + + private final ScriptService scriptService; + + Factory(ScriptService scriptService) { + this.scriptService = scriptService; + } + @Override public ForEachProcessor create(Map factories, String tag, Map config) throws Exception { @@ -107,7 +115,8 @@ public final class ForEachProcessor extends AbstractProcessor { throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type"); } Map.Entry> entry = entries.iterator().next(); - Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue()); + Processor processor = + ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue()); return new ForEachProcessor(tag, field, processor, ignoreMissing); } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 1ed8b6058e6..8b048282814 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -72,7 +72,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory()); processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory()); processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)); - processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory()); + processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)); processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)); processors.put(SortProcessor.TYPE, new SortProcessor.Factory()); processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index f382ad8dcfb..7ab19c4147e 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -30,14 +31,17 @@ import java.util.HashMap; import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; public class ForEachProcessorFactoryTests extends ESTestCase { + private final ScriptService scriptService = mock(ScriptService.class); + public void testCreate() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -53,7 +57,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -71,7 +75,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase { Map registry = new HashMap<>(); registry.put("_first", (r, t, c) -> processor); registry.put("_second", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -84,7 +88,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase { } public void testCreateWithNonExistingProcessorType() throws Exception { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); @@ -97,7 +101,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config)); @@ -105,7 +109,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase { } public void testCreateWithMissingProcessor() { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config)); diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_conditional_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_conditional_processor.yml new file mode 100644 index 00000000000..532519c4ca0 --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_conditional_processor.yml @@ -0,0 +1,81 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test conditional processor fulfilled condition": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "bytes" : { + "if" : "ctx.conditional_field == 'bar'", + "field" : "bytes_source_field", + "target_field" : "bytes_target_field" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {bytes_source_field: "1kb", conditional_field: "bar"} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.conditional_field: "bar" } + - match: { _source.bytes_target_field: 1024 } + +--- +"Test conditional processor unfulfilled condition": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "bytes" : { + "if" : "ctx.conditional_field == 'foo'", + "field" : "bytes_source_field", + "target_field" : "bytes_target_field" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {bytes_source_field: "1kb", conditional_field: "bar"} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.conditional_field: "bar" } + - is_false: _source.bytes_target_field + diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index fecee5f265f..7514a41f575 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -171,9 +171,11 @@ public class SimulatePipelineRequest extends ActionRequest implements ToXContent return new Parsed(pipeline, ingestDocumentList, verbose); } - static Parsed parse(Map config, boolean verbose, IngestService pipelineStore) throws Exception { + static Parsed parse(Map config, boolean verbose, IngestService ingestService) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); - Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories()); + Pipeline pipeline = Pipeline.create( + SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService() + ); List ingestDocumentList = parseDocs(config); return new Parsed(pipeline, ingestDocumentList, verbose); } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java new file mode 100644 index 00000000000..d1eb651acae --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -0,0 +1,381 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.elasticsearch.script.IngestConditionalScript; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptService; + +public class ConditionalProcessor extends AbstractProcessor { + + static final String TYPE = "conditional"; + + private final Script condition; + + private final ScriptService scriptService; + + private final Processor processor; + + ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { + super(tag); + this.condition = script; + this.scriptService = scriptService; + this.processor = processor; + } + + @Override + public void execute(IngestDocument ingestDocument) throws Exception { + IngestConditionalScript script = + scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); + if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { + processor.execute(ingestDocument); + } + } + + @Override + public String getType() { + return TYPE; + } + + private static Object wrapUnmodifiable(Object raw) { + // Wraps all mutable types that the JSON parser can create by immutable wrappers. + // Any inputs not wrapped are assumed to be immutable + if (raw instanceof Map) { + return new UnmodifiableIngestData((Map) raw); + } else if (raw instanceof List) { + return new UnmodifiableIngestList((List) raw); + } else if (raw instanceof byte[]) { + return ((byte[]) raw).clone(); + } + return raw; + } + + private static UnsupportedOperationException unmodifiableException() { + return new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + private static final class UnmodifiableIngestData implements Map { + + private final Map data; + + UnmodifiableIngestData(Map data) { + this.data = data; + } + + @Override + public int size() { + return data.size(); + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + + @Override + public boolean containsKey(final Object key) { + return data.containsKey(key); + } + + @Override + public boolean containsValue(final Object value) { + return data.containsValue(value); + } + + @Override + public Object get(final Object key) { + return wrapUnmodifiable(data.get(key)); + } + + @Override + public Object put(final String key, final Object value) { + throw unmodifiableException(); + } + + @Override + public Object remove(final Object key) { + throw unmodifiableException(); + } + + @Override + public void putAll(final Map m) { + throw unmodifiableException(); + } + + @Override + public void clear() { + throw unmodifiableException(); + } + + @Override + public Set keySet() { + return Collections.unmodifiableSet(data.keySet()); + } + + @Override + public Collection values() { + return new UnmodifiableIngestList(new ArrayList<>(data.values())); + } + + @Override + public Set> entrySet() { + return data.entrySet().stream().map(entry -> + new Entry() { + @Override + public String getKey() { + return entry.getKey(); + } + + @Override + public Object getValue() { + return wrapUnmodifiable(entry.getValue()); + } + + @Override + public Object setValue(final Object value) { + throw unmodifiableException(); + } + + @Override + public boolean equals(final Object o) { + return entry.equals(o); + } + + @Override + public int hashCode() { + return entry.hashCode(); + } + }).collect(Collectors.toSet()); + } + } + + private static final class UnmodifiableIngestList implements List { + + private final List data; + + UnmodifiableIngestList(List data) { + this.data = data; + } + + @Override + public int size() { + return data.size(); + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + + @Override + public boolean contains(final Object o) { + return data.contains(o); + } + + @Override + public Iterator iterator() { + Iterator wrapped = data.iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return wrapped.hasNext(); + } + + @Override + public Object next() { + return wrapped.next(); + } + + @Override + public void remove() { + throw unmodifiableException(); + } + }; + } + + @Override + public Object[] toArray() { + Object[] wrapped = data.toArray(new Object[0]); + for (int i = 0; i < wrapped.length; i++) { + wrapped[i] = wrapUnmodifiable(wrapped[i]); + } + return wrapped; + } + + @Override + public T[] toArray(final T[] a) { + Object[] raw = data.toArray(new Object[0]); + T[] wrapped = (T[]) Arrays.copyOf(raw, a.length, a.getClass()); + for (int i = 0; i < wrapped.length; i++) { + wrapped[i] = (T) wrapUnmodifiable(wrapped[i]); + } + return wrapped; + } + + @Override + public boolean add(final Object o) { + throw unmodifiableException(); + } + + @Override + public boolean remove(final Object o) { + throw unmodifiableException(); + } + + @Override + public boolean containsAll(final Collection c) { + return data.contains(c); + } + + @Override + public boolean addAll(final Collection c) { + throw unmodifiableException(); + } + + @Override + public boolean addAll(final int index, final Collection c) { + throw unmodifiableException(); + } + + @Override + public boolean removeAll(final Collection c) { + throw unmodifiableException(); + } + + @Override + public boolean retainAll(final Collection c) { + throw unmodifiableException(); + } + + @Override + public void clear() { + throw unmodifiableException(); + } + + @Override + public Object get(final int index) { + return wrapUnmodifiable(data.get(index)); + } + + @Override + public Object set(final int index, final Object element) { + throw unmodifiableException(); + } + + @Override + public void add(final int index, final Object element) { + throw unmodifiableException(); + } + + @Override + public Object remove(final int index) { + throw unmodifiableException(); + } + + @Override + public int indexOf(final Object o) { + return data.indexOf(o); + } + + @Override + public int lastIndexOf(final Object o) { + return data.lastIndexOf(o); + } + + @Override + public ListIterator listIterator() { + return new UnmodifiableListIterator(data.listIterator()); + } + + @Override + public ListIterator listIterator(final int index) { + return new UnmodifiableListIterator(data.listIterator(index)); + } + + @Override + public List subList(final int fromIndex, final int toIndex) { + return new UnmodifiableIngestList(data.subList(fromIndex, toIndex)); + } + + private static final class UnmodifiableListIterator implements ListIterator { + + private final ListIterator data; + + UnmodifiableListIterator(ListIterator data) { + this.data = data; + } + + @Override + public boolean hasNext() { + return data.hasNext(); + } + + @Override + public Object next() { + return wrapUnmodifiable(data.next()); + } + + @Override + public boolean hasPrevious() { + return data.hasPrevious(); + } + + @Override + public Object previous() { + return wrapUnmodifiable(data.previous()); + } + + @Override + public int nextIndex() { + return data.nextIndex(); + } + + @Override + public int previousIndex() { + return data.previousIndex(); + } + + @Override + public void remove() { + throw unmodifiableException(); + } + + @Override + public void set(final Object o) { + throw unmodifiableException(); + } + + @Override + public void add(final Object o) { + throw unmodifiableException(); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index 54d06d11655..d4f27f47eb8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -19,9 +19,18 @@ package org.elasticsearch.ingest; +import java.io.IOException; +import java.io.InputStream; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptType; @@ -296,6 +305,7 @@ public final class ConfigurationUtils { } public static List readProcessorConfigs(List> processorConfigs, + ScriptService scriptService, Map processorFactories) throws Exception { Exception exception = null; List processors = new ArrayList<>(); @@ -303,7 +313,7 @@ public final class ConfigurationUtils { for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { try { - processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue())); + processors.add(readProcessor(processorFactories, scriptService, entry.getKey(), entry.getValue())); } catch (Exception e) { exception = ExceptionsHelper.useOrSuppress(exception, e); } @@ -356,13 +366,14 @@ public final class ConfigurationUtils { @SuppressWarnings("unchecked") public static Processor readProcessor(Map processorFactories, + ScriptService scriptService, String type, Object config) throws Exception { if (config instanceof Map) { - return readProcessor(processorFactories, type, (Map) config); + return readProcessor(processorFactories, scriptService, type, (Map) config); } else if (config instanceof String && "script".equals(type)) { Map normalizedScript = new HashMap<>(1); normalizedScript.put(ScriptType.INLINE.getParseField().getPreferredName(), config); - return readProcessor(processorFactories, type, normalizedScript); + return readProcessor(processorFactories, scriptService, type, normalizedScript); } else { throw newConfigurationException(type, null, null, "property isn't a map, but of type [" + config.getClass().getName() + "]"); @@ -370,15 +381,17 @@ public final class ConfigurationUtils { } public static Processor readProcessor(Map processorFactories, + ScriptService scriptService, String type, Map config) throws Exception { String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); + Script conditionalScript = extractConditional(config); Processor.Factory factory = processorFactories.get(type); if (factory != null) { boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false); List> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY); - List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories); + List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories); if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY, @@ -392,14 +405,42 @@ public final class ConfigurationUtils { type, Arrays.toString(config.keySet().toArray())); } if (onFailureProcessors.size() > 0 || ignoreFailure) { - return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors); - } else { - return processor; + processor = new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors); } + if (conditionalScript != null) { + processor = new ConditionalProcessor(tag, conditionalScript, scriptService, processor); + } + return processor; } catch (Exception e) { throw newConfigurationException(type, tag, null, e); } } throw newConfigurationException(type, tag, null, "No processor type exists with name [" + type + "]"); } + + private static Script extractConditional(Map config) throws IOException { + Object scriptSource = config.remove("if"); + if (scriptSource != null) { + try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent) + .map(normalizeScript(scriptSource)); + InputStream stream = BytesReference.bytes(builder).streamInput(); + XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, stream)) { + return Script.parse(parser); + } + } + return null; + } + + @SuppressWarnings("unchecked") + private static Map normalizeScript(Object scriptConfig) { + if (scriptConfig instanceof Map) { + return (Map) scriptConfig; + } else if (scriptConfig instanceof String) { + return Collections.singletonMap("source", scriptConfig); + } else { + throw newConfigurationException("conditional", null, "script", + "property isn't a map or string, but of type [" + scriptConfig.getClass().getName() + "]"); + } + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index eee14e95869..f0f5d76caab 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -71,6 +71,7 @@ public class IngestService implements ClusterStateApplier { public static final String NOOP_PIPELINE_NAME = "_none"; private final ClusterService clusterService; + private final ScriptService scriptService; private final Map processorFactories; // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. // We know of all the processor factories when a node with all its plugin have been initialized. Also some @@ -85,6 +86,7 @@ public class IngestService implements ClusterStateApplier { Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins) { this.clusterService = clusterService; + this.scriptService = scriptService; this.processorFactories = processorFactories( ingestPlugins, new Processor.Parameters( @@ -116,6 +118,10 @@ public class IngestService implements ClusterStateApplier { return clusterService; } + public ScriptService getScriptService() { + return scriptService; + } + /** * Deletes the pipeline specified by id in the request. */ @@ -300,11 +306,12 @@ public class IngestService implements ClusterStateApplier { } Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories); + Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService); List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { for (Map.Entry entry : ingestInfos.entrySet()) { - if (entry.getValue().containsProcessor(processor.getType()) == false) { + String type = processor.getType(); + if (entry.getValue().containsProcessor(type) == false && ConditionalProcessor.TYPE.equals(type) == false) { String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; exceptions.add( ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message) @@ -452,7 +459,10 @@ public class IngestService implements ClusterStateApplier { List exceptions = new ArrayList<>(); for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { try { - pipelines.put(pipeline.getId(), Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); + pipelines.put( + pipeline.getId(), + Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories, scriptService) + ); } catch (ElasticsearchParseException e) { pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); exceptions.add(e); diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 37dd3f52cb7..0a8f9fbc0d8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import org.elasticsearch.script.ScriptService; /** * A pipeline is a list of {@link Processor} instances grouped under a unique id. @@ -52,14 +53,15 @@ public final class Pipeline { } public static Pipeline create(String id, Map config, - Map processorFactories) throws Exception { + Map processorFactories, ScriptService scriptService) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); - List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); + List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, scriptService, processorFactories); List> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); - List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories); + List onFailureProcessors = + ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories); if (config.isEmpty() == false) { throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); diff --git a/server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java b/server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java new file mode 100644 index 00000000000..27ce29b95dc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.script; + +import java.util.Map; + +/** + * A script used by {@link org.elasticsearch.ingest.ConditionalProcessor}. + */ +public abstract class IngestConditionalScript { + + public static final String[] PARAMETERS = { "ctx" }; + + /** The context used to compile {@link IngestConditionalScript} factories. */ + public static final ScriptContext CONTEXT = new ScriptContext<>("processor_conditional", Factory.class); + + /** The generic runtime parameters for the script. */ + private final Map params; + + public IngestConditionalScript(Map params) { + this.params = params; + } + + /** Return the parameters for this script. */ + public Map getParams() { + return params; + } + + public abstract boolean execute(Map ctx); + + public interface Factory { + IngestConditionalScript newInstance(Map params); + } +} diff --git a/server/src/main/java/org/elasticsearch/script/ScriptModule.java b/server/src/main/java/org/elasticsearch/script/ScriptModule.java index f04e690fa42..1788d8c792b 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptModule.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptModule.java @@ -51,6 +51,7 @@ public class ScriptModule { BucketAggregationSelectorScript.CONTEXT, SignificantTermsHeuristicScoreScript.CONTEXT, IngestScript.CONTEXT, + IngestConditionalScript.CONTEXT, FilterScript.CONTEXT, SimilarityScript.CONTEXT, SimilarityWeightScript.CONTEXT, diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java new file mode 100644 index 00000000000..2cb13af7a28 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.core.Is.is; + +public class ConditionalProcessorTests extends ESTestCase { + + public void testChecksCondition() throws Exception { + String conditionalField = "field1"; + String scriptName = "conditionalScript"; + String trueValue = "truthy"; + ScriptService scriptService = new ScriptService(Settings.builder().build(), + Collections.singletonMap( + Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine( + Script.DEFAULT_SCRIPT_LANG, + Collections.singletonMap( + scriptName, ctx -> trueValue.equals(ctx.get(conditionalField)) + ) + ) + ), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + Map document = new HashMap<>(); + ConditionalProcessor processor = new ConditionalProcessor( + randomAlphaOfLength(10), + new Script( + ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, + scriptName, Collections.emptyMap()), scriptService, + new Processor() { + @Override + public void execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.setFieldValue("foo", "bar"); + } + + @Override + public String getType() { + return null; + } + + @Override + public String getTag() { + return null; + } + }); + + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, trueValue); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); + assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); + + String falseValue = "falsy"; + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, falseValue); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); + assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); + } + + @SuppressWarnings("unchecked") + public void testActsOnImmutableData() throws Exception { + assertMutatingCtxThrows(ctx -> ctx.remove("foo")); + assertMutatingCtxThrows(ctx -> ctx.put("foo", "bar")); + assertMutatingCtxThrows(ctx -> ((List)ctx.get("listField")).add("bar")); + assertMutatingCtxThrows(ctx -> ((List)ctx.get("listField")).remove("bar")); + } + + private static void assertMutatingCtxThrows(Consumer> mutation) throws Exception { + String scriptName = "conditionalScript"; + CompletableFuture expectedException = new CompletableFuture<>(); + ScriptService scriptService = new ScriptService(Settings.builder().build(), + Collections.singletonMap( + Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine( + Script.DEFAULT_SCRIPT_LANG, + Collections.singletonMap( + scriptName, ctx -> { + try { + mutation.accept(ctx); + } catch (Exception e) { + expectedException.complete(e); + } + return false; + } + ) + ) + ), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + Map document = new HashMap<>(); + ConditionalProcessor processor = new ConditionalProcessor( + randomAlphaOfLength(10), + new Script( + ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, + scriptName, Collections.emptyMap()), scriptService, null + ); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue("listField", new ArrayList<>()); + processor.execute(ingestDocument); + Exception e = expectedException.get(); + assertThat(e, instanceOf(UnsupportedOperationException.class)); + assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); + } +} diff --git a/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java b/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java index 61afd9ce2a4..f3a11a86e54 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -38,6 +39,9 @@ import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; public class ConfigurationUtilsTests extends ESTestCase { + + private final ScriptService scriptService = mock(ScriptService.class); + private Map config; @Before @@ -120,7 +124,7 @@ public class ConfigurationUtilsTests extends ESTestCase { config.add(Collections.singletonMap("test_processor", emptyConfig)); config.add(Collections.singletonMap("test_processor", emptyConfig)); - List result = ConfigurationUtils.readProcessorConfigs(config, registry); + List result = ConfigurationUtils.readProcessorConfigs(config, scriptService, registry); assertThat(result.size(), equalTo(2)); assertThat(result.get(0), sameInstance(processor)); assertThat(result.get(1), sameInstance(processor)); @@ -129,7 +133,7 @@ public class ConfigurationUtilsTests extends ESTestCase { unknownTaggedConfig.put("tag", "my_unknown"); config.add(Collections.singletonMap("unknown_processor", unknownTaggedConfig)); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, - () -> ConfigurationUtils.readProcessorConfigs(config, registry)); + () -> ConfigurationUtils.readProcessorConfigs(config, scriptService, registry)); assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]")); assertThat(e.getMetadata("es.processor_tag"), equalTo(Collections.singletonList("my_unknown"))); assertThat(e.getMetadata("es.processor_type"), equalTo(Collections.singletonList("unknown_processor"))); @@ -142,7 +146,10 @@ public class ConfigurationUtilsTests extends ESTestCase { Map secondUnknonwTaggedConfig = new HashMap<>(); secondUnknonwTaggedConfig.put("tag", "my_second_unknown"); config2.add(Collections.singletonMap("second_unknown_processor", secondUnknonwTaggedConfig)); - e = expectThrows(ElasticsearchParseException.class, () -> ConfigurationUtils.readProcessorConfigs(config2, registry)); + e = expectThrows( + ElasticsearchParseException.class, + () -> ConfigurationUtils.readProcessorConfigs(config2, scriptService, registry) + ); assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]")); assertThat(e.getMetadata("es.processor_tag"), equalTo(Collections.singletonList("my_unknown"))); assertThat(e.getMetadata("es.processor_type"), equalTo(Collections.singletonList("unknown_processor"))); @@ -166,17 +173,17 @@ public class ConfigurationUtilsTests extends ESTestCase { }); Object emptyConfig = Collections.emptyMap(); - Processor processor1 = ConfigurationUtils.readProcessor(registry, "script", emptyConfig); + Processor processor1 = ConfigurationUtils.readProcessor(registry, scriptService, "script", emptyConfig); assertThat(processor1, sameInstance(processor)); Object inlineScript = "test_script"; - Processor processor2 = ConfigurationUtils.readProcessor(registry, "script", inlineScript); + Processor processor2 = ConfigurationUtils.readProcessor(registry, scriptService, "script", inlineScript); assertThat(processor2, sameInstance(processor)); Object invalidConfig = 12L; ElasticsearchParseException ex = expectThrows(ElasticsearchParseException.class, - () -> ConfigurationUtils.readProcessor(registry, "unknown_processor", invalidConfig)); + () -> ConfigurationUtils.readProcessor(registry, scriptService, "unknown_processor", invalidConfig)); assertThat(ex.getMessage(), equalTo("property isn't a map, but of type [" + invalidConfig.getClass().getName() + "]")); } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index cafdbcfb446..d6d7b4ffa81 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; @@ -32,11 +33,13 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; public class PipelineFactoryTests extends ESTestCase { private final Integer version = randomBoolean() ? randomInt() : null; private final String versionString = version != null ? Integer.toString(version) : null; + private final ScriptService scriptService = mock(ScriptService.class); public void testCreate() throws Exception { Map processorConfig0 = new HashMap<>(); @@ -48,7 +51,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -64,7 +67,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); try { - Pipeline.create("_id", pipelineConfig, Collections.emptyMap()); + Pipeline.create("_id", pipelineConfig, Collections.emptyMap(), scriptService); fail("should fail, missing required [processors] field"); } catch (ElasticsearchParseException e) { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); @@ -76,7 +79,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -91,7 +94,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -109,7 +112,10 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList()); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows( + ElasticsearchParseException.class, + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService) + ); assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined")); } @@ -121,7 +127,10 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows( + ElasticsearchParseException.class, + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService) + ); assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty")); } @@ -136,7 +145,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -156,7 +165,10 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows( + ElasticsearchParseException.class, + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService) + ); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } @@ -169,7 +181,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); diff --git a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java index 4e2b8259e6f..0ee5798efb3 100644 --- a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java @@ -96,6 +96,14 @@ public class MockScriptEngine implements ScriptEngine { } }; return context.factoryClazz.cast(factory); + } else if (context.instanceClazz.equals(IngestConditionalScript.class)) { + IngestConditionalScript.Factory factory = parameters -> new IngestConditionalScript(parameters) { + @Override + public boolean execute(Map ctx) { + return (boolean) script.apply(ctx); + } + }; + return context.factoryClazz.cast(factory); } else if (context.instanceClazz.equals(UpdateScript.class)) { UpdateScript.Factory factory = parameters -> new UpdateScript(parameters) { @Override