Ingest: Add conditional per processor (#32398)

* Ingest: Add conditional per processor
* closes #21248
This commit is contained in:
Armin Braun 2018-08-30 03:46:39 +02:00 committed by GitHub
parent d93b2a2e9a
commit cc4d7059bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 788 additions and 38 deletions

View File

@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.elasticsearch.script.ScriptService;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
@ -96,6 +97,13 @@ public final class ForEachProcessor extends AbstractProcessor {
}
public static final class Factory implements Processor.Factory {
private final ScriptService scriptService;
Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}
@Override
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
@ -107,7 +115,8 @@ public final class ForEachProcessor extends AbstractProcessor {
throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type");
}
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
Processor processor =
ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor, ignoreMissing);
}
}

View File

@ -72,7 +72,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory());
processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService));
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory());
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService));
processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService));
processors.put(SortProcessor.TYPE, new SortProcessor.Factory());
processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters)));

View File

@ -22,6 +22,7 @@ package org.elasticsearch.ingest.common;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
@ -30,14 +31,17 @@ import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
public class ForEachProcessorFactoryTests extends ESTestCase {
private final ScriptService scriptService = mock(ScriptService.class);
public void testCreate() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -53,7 +57,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -71,7 +75,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_first", (r, t, c) -> processor);
registry.put("_second", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -84,7 +88,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
}
public void testCreateWithNonExistingProcessorType() throws Exception {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
@ -97,7 +101,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
@ -105,7 +109,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
}
public void testCreateWithMissingProcessor() {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));

View File

@ -0,0 +1,81 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404
---
"Test conditional processor fulfilled condition":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"if" : "ctx.conditional_field == 'bar'",
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {bytes_source_field: "1kb", conditional_field: "bar"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.conditional_field: "bar" }
- match: { _source.bytes_target_field: 1024 }
---
"Test conditional processor unfulfilled condition":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"if" : "ctx.conditional_field == 'foo'",
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {bytes_source_field: "1kb", conditional_field: "bar"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.conditional_field: "bar" }
- is_false: _source.bytes_target_field

View File

@ -171,9 +171,11 @@ public class SimulatePipelineRequest extends ActionRequest implements ToXContent
return new Parsed(pipeline, ingestDocumentList, verbose);
}
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService pipelineStore) throws Exception {
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
Pipeline pipeline = Pipeline.create(
SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService()
);
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

View File

@ -0,0 +1,381 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.elasticsearch.script.IngestConditionalScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
public class ConditionalProcessor extends AbstractProcessor {
static final String TYPE = "conditional";
private final Script condition;
private final ScriptService scriptService;
private final Processor processor;
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) {
super(tag);
this.condition = script;
this.scriptService = scriptService;
this.processor = processor;
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
processor.execute(ingestDocument);
}
}
@Override
public String getType() {
return TYPE;
}
private static Object wrapUnmodifiable(Object raw) {
// Wraps all mutable types that the JSON parser can create by immutable wrappers.
// Any inputs not wrapped are assumed to be immutable
if (raw instanceof Map) {
return new UnmodifiableIngestData((Map<String, Object>) raw);
} else if (raw instanceof List) {
return new UnmodifiableIngestList((List<Object>) raw);
} else if (raw instanceof byte[]) {
return ((byte[]) raw).clone();
}
return raw;
}
private static UnsupportedOperationException unmodifiableException() {
return new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported");
}
private static final class UnmodifiableIngestData implements Map<String, Object> {
private final Map<String, Object> data;
UnmodifiableIngestData(Map<String, Object> data) {
this.data = data;
}
@Override
public int size() {
return data.size();
}
@Override
public boolean isEmpty() {
return data.isEmpty();
}
@Override
public boolean containsKey(final Object key) {
return data.containsKey(key);
}
@Override
public boolean containsValue(final Object value) {
return data.containsValue(value);
}
@Override
public Object get(final Object key) {
return wrapUnmodifiable(data.get(key));
}
@Override
public Object put(final String key, final Object value) {
throw unmodifiableException();
}
@Override
public Object remove(final Object key) {
throw unmodifiableException();
}
@Override
public void putAll(final Map<? extends String, ?> m) {
throw unmodifiableException();
}
@Override
public void clear() {
throw unmodifiableException();
}
@Override
public Set<String> keySet() {
return Collections.unmodifiableSet(data.keySet());
}
@Override
public Collection<Object> values() {
return new UnmodifiableIngestList(new ArrayList<>(data.values()));
}
@Override
public Set<Entry<String, Object>> entrySet() {
return data.entrySet().stream().map(entry ->
new Entry<String, Object>() {
@Override
public String getKey() {
return entry.getKey();
}
@Override
public Object getValue() {
return wrapUnmodifiable(entry.getValue());
}
@Override
public Object setValue(final Object value) {
throw unmodifiableException();
}
@Override
public boolean equals(final Object o) {
return entry.equals(o);
}
@Override
public int hashCode() {
return entry.hashCode();
}
}).collect(Collectors.toSet());
}
}
private static final class UnmodifiableIngestList implements List<Object> {
private final List<Object> data;
UnmodifiableIngestList(List<Object> data) {
this.data = data;
}
@Override
public int size() {
return data.size();
}
@Override
public boolean isEmpty() {
return data.isEmpty();
}
@Override
public boolean contains(final Object o) {
return data.contains(o);
}
@Override
public Iterator<Object> iterator() {
Iterator<Object> wrapped = data.iterator();
return new Iterator<Object>() {
@Override
public boolean hasNext() {
return wrapped.hasNext();
}
@Override
public Object next() {
return wrapped.next();
}
@Override
public void remove() {
throw unmodifiableException();
}
};
}
@Override
public Object[] toArray() {
Object[] wrapped = data.toArray(new Object[0]);
for (int i = 0; i < wrapped.length; i++) {
wrapped[i] = wrapUnmodifiable(wrapped[i]);
}
return wrapped;
}
@Override
public <T> T[] toArray(final T[] a) {
Object[] raw = data.toArray(new Object[0]);
T[] wrapped = (T[]) Arrays.copyOf(raw, a.length, a.getClass());
for (int i = 0; i < wrapped.length; i++) {
wrapped[i] = (T) wrapUnmodifiable(wrapped[i]);
}
return wrapped;
}
@Override
public boolean add(final Object o) {
throw unmodifiableException();
}
@Override
public boolean remove(final Object o) {
throw unmodifiableException();
}
@Override
public boolean containsAll(final Collection<?> c) {
return data.contains(c);
}
@Override
public boolean addAll(final Collection<?> c) {
throw unmodifiableException();
}
@Override
public boolean addAll(final int index, final Collection<?> c) {
throw unmodifiableException();
}
@Override
public boolean removeAll(final Collection<?> c) {
throw unmodifiableException();
}
@Override
public boolean retainAll(final Collection<?> c) {
throw unmodifiableException();
}
@Override
public void clear() {
throw unmodifiableException();
}
@Override
public Object get(final int index) {
return wrapUnmodifiable(data.get(index));
}
@Override
public Object set(final int index, final Object element) {
throw unmodifiableException();
}
@Override
public void add(final int index, final Object element) {
throw unmodifiableException();
}
@Override
public Object remove(final int index) {
throw unmodifiableException();
}
@Override
public int indexOf(final Object o) {
return data.indexOf(o);
}
@Override
public int lastIndexOf(final Object o) {
return data.lastIndexOf(o);
}
@Override
public ListIterator<Object> listIterator() {
return new UnmodifiableListIterator(data.listIterator());
}
@Override
public ListIterator<Object> listIterator(final int index) {
return new UnmodifiableListIterator(data.listIterator(index));
}
@Override
public List<Object> subList(final int fromIndex, final int toIndex) {
return new UnmodifiableIngestList(data.subList(fromIndex, toIndex));
}
private static final class UnmodifiableListIterator implements ListIterator<Object> {
private final ListIterator<Object> data;
UnmodifiableListIterator(ListIterator<Object> data) {
this.data = data;
}
@Override
public boolean hasNext() {
return data.hasNext();
}
@Override
public Object next() {
return wrapUnmodifiable(data.next());
}
@Override
public boolean hasPrevious() {
return data.hasPrevious();
}
@Override
public Object previous() {
return wrapUnmodifiable(data.previous());
}
@Override
public int nextIndex() {
return data.nextIndex();
}
@Override
public int previousIndex() {
return data.previousIndex();
}
@Override
public void remove() {
throw unmodifiableException();
}
@Override
public void set(final Object o) {
throw unmodifiableException();
}
@Override
public void add(final Object o) {
throw unmodifiableException();
}
}
}
}

View File

@ -19,9 +19,18 @@
package org.elasticsearch.ingest;
import java.io.IOException;
import java.io.InputStream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
@ -296,6 +305,7 @@ public final class ConfigurationUtils {
}
public static List<Processor> readProcessorConfigs(List<Map<String, Object>> processorConfigs,
ScriptService scriptService,
Map<String, Processor.Factory> processorFactories) throws Exception {
Exception exception = null;
List<Processor> processors = new ArrayList<>();
@ -303,7 +313,7 @@ public final class ConfigurationUtils {
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
try {
processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue()));
processors.add(readProcessor(processorFactories, scriptService, entry.getKey(), entry.getValue()));
} catch (Exception e) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
}
@ -356,13 +366,14 @@ public final class ConfigurationUtils {
@SuppressWarnings("unchecked")
public static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
ScriptService scriptService,
String type, Object config) throws Exception {
if (config instanceof Map) {
return readProcessor(processorFactories, type, (Map<String, Object>) config);
return readProcessor(processorFactories, scriptService, type, (Map<String, Object>) config);
} else if (config instanceof String && "script".equals(type)) {
Map<String, Object> normalizedScript = new HashMap<>(1);
normalizedScript.put(ScriptType.INLINE.getParseField().getPreferredName(), config);
return readProcessor(processorFactories, type, normalizedScript);
return readProcessor(processorFactories, scriptService, type, normalizedScript);
} else {
throw newConfigurationException(type, null, null,
"property isn't a map, but of type [" + config.getClass().getName() + "]");
@ -370,15 +381,17 @@ public final class ConfigurationUtils {
}
public static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
ScriptService scriptService,
String type, Map<String, Object> config) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
Script conditionalScript = extractConditional(config);
Processor.Factory factory = processorFactories.get(type);
if (factory != null) {
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false);
List<Map<String, Object>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories);
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY,
@ -392,14 +405,42 @@ public final class ConfigurationUtils {
type, Arrays.toString(config.keySet().toArray()));
}
if (onFailureProcessors.size() > 0 || ignoreFailure) {
return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors);
} else {
return processor;
processor = new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors);
}
if (conditionalScript != null) {
processor = new ConditionalProcessor(tag, conditionalScript, scriptService, processor);
}
return processor;
} catch (Exception e) {
throw newConfigurationException(type, tag, null, e);
}
}
throw newConfigurationException(type, tag, null, "No processor type exists with name [" + type + "]");
}
private static Script extractConditional(Map<String, Object> config) throws IOException {
Object scriptSource = config.remove("if");
if (scriptSource != null) {
try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)
.map(normalizeScript(scriptSource));
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, stream)) {
return Script.parse(parser);
}
}
return null;
}
@SuppressWarnings("unchecked")
private static Map<String, Object> normalizeScript(Object scriptConfig) {
if (scriptConfig instanceof Map<?, ?>) {
return (Map<String, Object>) scriptConfig;
} else if (scriptConfig instanceof String) {
return Collections.singletonMap("source", scriptConfig);
} else {
throw newConfigurationException("conditional", null, "script",
"property isn't a map or string, but of type [" + scriptConfig.getClass().getName() + "]");
}
}
}

View File

@ -71,6 +71,7 @@ public class IngestService implements ClusterStateApplier {
public static final String NOOP_PIPELINE_NAME = "_none";
private final ClusterService clusterService;
private final ScriptService scriptService;
private final Map<String, Processor.Factory> processorFactories;
// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
@ -85,6 +86,7 @@ public class IngestService implements ClusterStateApplier {
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins) {
this.clusterService = clusterService;
this.scriptService = scriptService;
this.processorFactories = processorFactories(
ingestPlugins,
new Processor.Parameters(
@ -116,6 +118,10 @@ public class IngestService implements ClusterStateApplier {
return clusterService;
}
public ScriptService getScriptService() {
return scriptService;
}
/**
* Deletes the pipeline specified by id in the request.
*/
@ -300,11 +306,12 @@ public class IngestService implements ClusterStateApplier {
}
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories);
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);
List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
if (entry.getValue().containsProcessor(processor.getType()) == false) {
String type = processor.getType();
if (entry.getValue().containsProcessor(type) == false && ConditionalProcessor.TYPE.equals(type) == false) {
String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
exceptions.add(
ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)
@ -452,7 +459,10 @@ public class IngestService implements ClusterStateApplier {
List<ElasticsearchParseException> exceptions = new ArrayList<>();
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
pipelines.put(
pipeline.getId(),
Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories, scriptService)
);
} catch (ElasticsearchParseException e) {
pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e));
exceptions.add(e);

View File

@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.elasticsearch.script.ScriptService;
/**
* A pipeline is a list of {@link Processor} instances grouped under a unique id.
@ -52,14 +53,15 @@ public final class Pipeline {
}
public static Pipeline create(String id, Map<String, Object> config,
Map<String, Processor.Factory> processorFactories) throws Exception {
Map<String, Processor.Factory> processorFactories, ScriptService scriptService) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, scriptService, processorFactories);
List<Map<String, Object>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
List<Processor> onFailureProcessors =
ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories);
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("pipeline [" + id +
"] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.script;
import java.util.Map;
/**
* A script used by {@link org.elasticsearch.ingest.ConditionalProcessor}.
*/
public abstract class IngestConditionalScript {
public static final String[] PARAMETERS = { "ctx" };
/** The context used to compile {@link IngestConditionalScript} factories. */
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>("processor_conditional", Factory.class);
/** The generic runtime parameters for the script. */
private final Map<String, Object> params;
public IngestConditionalScript(Map<String, Object> params) {
this.params = params;
}
/** Return the parameters for this script. */
public Map<String, Object> getParams() {
return params;
}
public abstract boolean execute(Map<String, Object> ctx);
public interface Factory {
IngestConditionalScript newInstance(Map<String, Object> params);
}
}

View File

@ -51,6 +51,7 @@ public class ScriptModule {
BucketAggregationSelectorScript.CONTEXT,
SignificantTermsHeuristicScoreScript.CONTEXT,
IngestScript.CONTEXT,
IngestConditionalScript.CONTEXT,
FilterScript.CONTEXT,
SimilarityScript.CONTEXT,
SimilarityWeightScript.CONTEXT,

View File

@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is;
public class ConditionalProcessorTests extends ESTestCase {
public void testChecksCondition() throws Exception {
String conditionalField = "field1";
String scriptName = "conditionalScript";
String trueValue = "truthy";
ScriptService scriptService = new ScriptService(Settings.builder().build(),
Collections.singletonMap(
Script.DEFAULT_SCRIPT_LANG,
new MockScriptEngine(
Script.DEFAULT_SCRIPT_LANG,
Collections.singletonMap(
scriptName, ctx -> trueValue.equals(ctx.get(conditionalField))
)
)
),
new HashMap<>(ScriptModule.CORE_CONTEXTS)
);
Map<String, Object> document = new HashMap<>();
ConditionalProcessor processor = new ConditionalProcessor(
randomAlphaOfLength(10),
new Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
scriptName, Collections.emptyMap()), scriptService,
new Processor() {
@Override
public void execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.setFieldValue("foo", "bar");
}
@Override
public String getType() {
return null;
}
@Override
public String getTag() {
return null;
}
});
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue));
assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar"));
String falseValue = "falsy";
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue));
assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo")));
}
@SuppressWarnings("unchecked")
public void testActsOnImmutableData() throws Exception {
assertMutatingCtxThrows(ctx -> ctx.remove("foo"));
assertMutatingCtxThrows(ctx -> ctx.put("foo", "bar"));
assertMutatingCtxThrows(ctx -> ((List<Object>)ctx.get("listField")).add("bar"));
assertMutatingCtxThrows(ctx -> ((List<Object>)ctx.get("listField")).remove("bar"));
}
private static void assertMutatingCtxThrows(Consumer<Map<String, Object>> mutation) throws Exception {
String scriptName = "conditionalScript";
CompletableFuture<Exception> expectedException = new CompletableFuture<>();
ScriptService scriptService = new ScriptService(Settings.builder().build(),
Collections.singletonMap(
Script.DEFAULT_SCRIPT_LANG,
new MockScriptEngine(
Script.DEFAULT_SCRIPT_LANG,
Collections.singletonMap(
scriptName, ctx -> {
try {
mutation.accept(ctx);
} catch (Exception e) {
expectedException.complete(e);
}
return false;
}
)
)
),
new HashMap<>(ScriptModule.CORE_CONTEXTS)
);
Map<String, Object> document = new HashMap<>();
ConditionalProcessor processor = new ConditionalProcessor(
randomAlphaOfLength(10),
new Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
scriptName, Collections.emptyMap()), scriptService, null
);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue("listField", new ArrayList<>());
processor.execute(ingestDocument);
Exception e = expectedException.get();
assertThat(e, instanceOf(UnsupportedOperationException.class));
assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage());
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -38,6 +39,9 @@ import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
public class ConfigurationUtilsTests extends ESTestCase {
private final ScriptService scriptService = mock(ScriptService.class);
private Map<String, Object> config;
@Before
@ -120,7 +124,7 @@ public class ConfigurationUtilsTests extends ESTestCase {
config.add(Collections.singletonMap("test_processor", emptyConfig));
config.add(Collections.singletonMap("test_processor", emptyConfig));
List<Processor> result = ConfigurationUtils.readProcessorConfigs(config, registry);
List<Processor> result = ConfigurationUtils.readProcessorConfigs(config, scriptService, registry);
assertThat(result.size(), equalTo(2));
assertThat(result.get(0), sameInstance(processor));
assertThat(result.get(1), sameInstance(processor));
@ -129,7 +133,7 @@ public class ConfigurationUtilsTests extends ESTestCase {
unknownTaggedConfig.put("tag", "my_unknown");
config.add(Collections.singletonMap("unknown_processor", unknownTaggedConfig));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> ConfigurationUtils.readProcessorConfigs(config, registry));
() -> ConfigurationUtils.readProcessorConfigs(config, scriptService, registry));
assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]"));
assertThat(e.getMetadata("es.processor_tag"), equalTo(Collections.singletonList("my_unknown")));
assertThat(e.getMetadata("es.processor_type"), equalTo(Collections.singletonList("unknown_processor")));
@ -142,7 +146,10 @@ public class ConfigurationUtilsTests extends ESTestCase {
Map<String, Object> secondUnknonwTaggedConfig = new HashMap<>();
secondUnknonwTaggedConfig.put("tag", "my_second_unknown");
config2.add(Collections.singletonMap("second_unknown_processor", secondUnknonwTaggedConfig));
e = expectThrows(ElasticsearchParseException.class, () -> ConfigurationUtils.readProcessorConfigs(config2, registry));
e = expectThrows(
ElasticsearchParseException.class,
() -> ConfigurationUtils.readProcessorConfigs(config2, scriptService, registry)
);
assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]"));
assertThat(e.getMetadata("es.processor_tag"), equalTo(Collections.singletonList("my_unknown")));
assertThat(e.getMetadata("es.processor_type"), equalTo(Collections.singletonList("unknown_processor")));
@ -166,17 +173,17 @@ public class ConfigurationUtilsTests extends ESTestCase {
});
Object emptyConfig = Collections.emptyMap();
Processor processor1 = ConfigurationUtils.readProcessor(registry, "script", emptyConfig);
Processor processor1 = ConfigurationUtils.readProcessor(registry, scriptService, "script", emptyConfig);
assertThat(processor1, sameInstance(processor));
Object inlineScript = "test_script";
Processor processor2 = ConfigurationUtils.readProcessor(registry, "script", inlineScript);
Processor processor2 = ConfigurationUtils.readProcessor(registry, scriptService, "script", inlineScript);
assertThat(processor2, sameInstance(processor));
Object invalidConfig = 12L;
ElasticsearchParseException ex = expectThrows(ElasticsearchParseException.class,
() -> ConfigurationUtils.readProcessor(registry, "unknown_processor", invalidConfig));
() -> ConfigurationUtils.readProcessor(registry, scriptService, "unknown_processor", invalidConfig));
assertThat(ex.getMessage(), equalTo("property isn't a map, but of type [" + invalidConfig.getClass().getName() + "]"));
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
@ -32,11 +33,13 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
public class PipelineFactoryTests extends ESTestCase {
private final Integer version = randomBoolean() ? randomInt() : null;
private final String versionString = version != null ? Integer.toString(version) : null;
private final ScriptService scriptService = mock(ScriptService.class);
public void testCreate() throws Exception {
Map<String, Object> processorConfig0 = new HashMap<>();
@ -48,7 +51,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY,
Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
@ -64,7 +67,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
try {
Pipeline.create("_id", pipelineConfig, Collections.emptyMap());
Pipeline.create("_id", pipelineConfig, Collections.emptyMap(), scriptService);
fail("should fail, missing required [processors] field");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
@ -76,7 +79,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList());
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null, scriptService);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
@ -91,7 +94,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
@ -109,7 +112,10 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList());
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry));
Exception e = expectThrows(
ElasticsearchParseException.class,
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService)
);
assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined"));
}
@ -121,7 +127,10 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry));
Exception e = expectThrows(
ElasticsearchParseException.class,
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService)
);
assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty"));
}
@ -136,7 +145,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY,
Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
@ -156,7 +165,10 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry));
Exception e = expectThrows(
ElasticsearchParseException.class,
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService)
);
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
@ -169,7 +181,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));

View File

@ -96,6 +96,14 @@ public class MockScriptEngine implements ScriptEngine {
}
};
return context.factoryClazz.cast(factory);
} else if (context.instanceClazz.equals(IngestConditionalScript.class)) {
IngestConditionalScript.Factory factory = parameters -> new IngestConditionalScript(parameters) {
@Override
public boolean execute(Map<String, Object> ctx) {
return (boolean) script.apply(ctx);
}
};
return context.factoryClazz.cast(factory);
} else if (context.instanceClazz.equals(UpdateScript.class)) {
UpdateScript.Factory factory = parameters -> new UpdateScript(parameters) {
@Override