From 46774098d9fcdbf7b983c05b6afd76b425be7f60 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 5 Sep 2018 14:25:29 +0200 Subject: [PATCH] INGEST: Implement Drop Processor (#32278) * INGEST: Implement Drop Processor * Adjust Processor API * Implement Drop Processor * Closes #23726 --- .../common/AbstractStringProcessor.java | 5 +- .../ingest/common/AppendProcessor.java | 3 +- .../ingest/common/ConvertProcessor.java | 5 +- .../ingest/common/DateIndexNameProcessor.java | 3 +- .../ingest/common/DateProcessor.java | 3 +- .../ingest/common/DissectProcessor.java | 5 +- .../ingest/common/DotExpanderProcessor.java | 3 +- .../ingest/common/DropProcessor.java | 57 +++++++++++++++++++ .../ingest/common/FailProcessor.java | 2 +- .../ingest/common/ForEachProcessor.java | 13 +++-- .../ingest/common/GrokProcessor.java | 5 +- .../ingest/common/IngestCommonPlugin.java | 1 + .../ingest/common/JoinProcessor.java | 3 +- .../ingest/common/JsonProcessor.java | 3 +- .../ingest/common/KeyValueProcessor.java | 3 +- .../ingest/common/PipelineProcessor.java | 4 +- .../ingest/common/Processors.java | 1 + .../ingest/common/RemoveProcessor.java | 3 +- .../ingest/common/RenameProcessor.java | 5 +- .../ingest/common/ScriptProcessor.java | 3 +- .../ingest/common/SetProcessor.java | 3 +- .../ingest/common/SortProcessor.java | 3 +- .../ingest/common/SplitProcessor.java | 5 +- .../ingest/common/ForEachProcessorTests.java | 3 +- .../ingest/common/PipelineProcessorTests.java | 3 +- .../attachment/AttachmentProcessor.java | 5 +- .../ingest/geoip/GeoIpProcessor.java | 5 +- .../ingest/useragent/UserAgentProcessor.java | 5 +- .../action/bulk/TransportBulkAction.java | 57 ++++++++++++------- .../ingest/SimulateExecutionService.java | 5 +- .../ingest/TrackingResultProcessor.java | 3 +- .../ingest/CompoundProcessor.java | 11 ++-- .../ingest/ConditionalProcessor.java | 5 +- .../elasticsearch/ingest/IngestDocument.java | 4 +- .../elasticsearch/ingest/IngestService.java | 37 ++++++------ .../org/elasticsearch/ingest/Pipeline.java | 4 +- .../org/elasticsearch/ingest/Processor.java | 2 +- .../bulk/TransportBulkActionIngestTests.java | 12 ++-- .../ingest/ConditionalProcessorTests.java | 3 +- .../ingest/IngestServiceTests.java | 36 ++++++------ .../elasticsearch/ingest/TestProcessor.java | 3 +- .../monitoring/test/MockIngestPlugin.java | 3 +- .../ingest/SetSecurityUserProcessor.java | 3 +- 43 files changed, 235 insertions(+), 115 deletions(-) create mode 100644 modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java index 23c98ca1e0c..792e5e4ebed 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java @@ -57,16 +57,17 @@ abstract class AbstractStringProcessor extends AbstractProcessor { } @Override - public final void execute(IngestDocument document) { + public final IngestDocument execute(IngestDocument document) { String val = document.getFieldValue(field, String.class, ignoreMissing); if (val == null && ignoreMissing) { - return; + return document; } else if (val == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot process it."); } document.setFieldValue(targetField, process(val)); + return document; } protected abstract T process(String value); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java index 0543ae8591f..058d1bf22d8 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java @@ -56,8 +56,9 @@ public final class AppendProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { ingestDocument.appendFieldValue(field, value); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java index 2e881b82b59..aca48efe6c1 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java @@ -173,12 +173,12 @@ public final class ConvertProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { Object oldValue = document.getFieldValue(field, Object.class, ignoreMissing); Object newValue; if (oldValue == null && ignoreMissing) { - return; + return document; } else if (oldValue == null) { throw new IllegalArgumentException("Field [" + field + "] is null, cannot be converted to type [" + convertType + "]"); } @@ -194,6 +194,7 @@ public final class ConvertProcessor extends AbstractProcessor { newValue = convertType.convert(oldValue); } document.setFieldValue(targetField, newValue); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java index 0d6253c88f9..4a88f15b641 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java @@ -63,7 +63,7 @@ public final class DateIndexNameProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { // Date can be specified as a string or long: Object obj = ingestDocument.getFieldValue(field, Object.class); String date = null; @@ -101,6 +101,7 @@ public final class DateIndexNameProcessor extends AbstractProcessor { .append('>'); String dynamicIndexName = builder.toString(); ingestDocument.setFieldValue(IngestDocument.MetaData.INDEX.getFieldName(), dynamicIndexName); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java index 4a9654f8cd0..dd6e6006eeb 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java @@ -74,7 +74,7 @@ public final class DateProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { Object obj = ingestDocument.getFieldValue(field, Object.class); String value = null; if (obj != null) { @@ -98,6 +98,7 @@ public final class DateProcessor extends AbstractProcessor { } ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime)); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java index 58f04ccdd43..fa51d047e73 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java @@ -47,14 +47,15 @@ public final class DissectProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { String input = ingestDocument.getFieldValue(field, String.class, ignoreMissing); if (input == null && ignoreMissing) { - return; + return ingestDocument; } else if (input == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot process it."); } dissectParser.parse(input).forEach(ingestDocument::setFieldValue); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java index bfc32311733..0698f6ed0a6 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java @@ -41,7 +41,7 @@ public final class DotExpanderProcessor extends AbstractProcessor { @Override @SuppressWarnings("unchecked") - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String path; Map map; if (this.path != null) { @@ -75,6 +75,7 @@ public final class DotExpanderProcessor extends AbstractProcessor { Object value = map.remove(field); ingestDocument.setFieldValue(path, value); } + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java new file mode 100644 index 00000000000..a0eabe38979 --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import java.util.Map; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; + +/** + * Drop processor only returns {@code null} for the execution result to indicate that any document + * executed by it should not be indexed. + */ +public final class DropProcessor extends AbstractProcessor { + + public static final String TYPE = "drop"; + + private DropProcessor(final String tag) { + super(tag); + } + + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + return null; + } + + @Override + public String getType() { + return TYPE; + } + + public static final class Factory implements Processor.Factory { + + @Override + public Processor create(final Map processorFactories, final String tag, + final Map config) { + return new DropProcessor(tag); + } + } +} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java index b1f946c10a2..0b62fbf72c8 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java @@ -48,7 +48,7 @@ public final class FailProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { throw new FailProcessorException(document.renderTemplate(message)); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 31c0ae8cc3d..ad93298c646 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -63,24 +63,29 @@ public final class ForEachProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing); if (values == null) { if (ignoreMissing) { - return; + return ingestDocument; } throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."); } List newValues = new ArrayList<>(values.size()); + IngestDocument document = ingestDocument; for (Object value : values) { Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); try { - processor.execute(ingestDocument); + document = processor.execute(document); + if (document == null) { + return null; + } } finally { newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue)); } } - ingestDocument.setFieldValue(field, newValues); + document.setFieldValue(field, newValues); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java index 88cba512b86..19883053d2a 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java @@ -54,11 +54,11 @@ public final class GrokProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String fieldValue = ingestDocument.getFieldValue(matchField, String.class, ignoreMissing); if (fieldValue == null && ignoreMissing) { - return; + return ingestDocument; } else if (fieldValue == null) { throw new IllegalArgumentException("field [" + matchField + "] is null, cannot process it."); } @@ -81,6 +81,7 @@ public final class GrokProcessor extends AbstractProcessor { ingestDocument.setFieldValue(PATTERN_MATCH_KEY, "0"); } } + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 8b048282814..d9dba2cc100 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -84,6 +84,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory()); processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)); processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory()); + processors.put(DropProcessor.TYPE, new DropProcessor.Factory()); return Collections.unmodifiableMap(processors); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java index 57216a71e02..f29a6888861 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java @@ -60,7 +60,7 @@ public final class JoinProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { List list = document.getFieldValue(field, List.class); if (list == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot join."); @@ -69,6 +69,7 @@ public final class JoinProcessor extends AbstractProcessor { .map(Object::toString) .collect(Collectors.joining(separator)); document.setFieldValue(targetField, joined); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java index c0a9d37abda..90a648347cd 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java @@ -107,12 +107,13 @@ public final class JsonProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument document) throws Exception { + public IngestDocument execute(IngestDocument document) throws Exception { if (addToRoot) { apply(document.getSourceAndMetadata(), field); } else { document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class))); } + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java index 9cce3cedf3d..69c7e9ff751 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java @@ -188,8 +188,9 @@ public final class KeyValueProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { execution.accept(document); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java index 77ffdb91919..1958a3e5232 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java @@ -42,12 +42,12 @@ public class PipelineProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { Pipeline pipeline = ingestService.getPipeline(pipelineName); if (pipeline == null) { throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'); } - ingestDocument.executePipeline(pipeline); + return ingestDocument.executePipeline(pipeline); } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java index 8a0b1529892..00209f55600 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java @@ -46,4 +46,5 @@ public final class Processors { public static String urlDecode(String value) { return URLDecodeProcessor.apply(value); } + } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java index 2b9eaa9a13d..6002abb9e67 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java @@ -52,7 +52,7 @@ public final class RemoveProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { if (ignoreMissing) { fields.forEach(field -> { String path = document.renderTemplate(field); @@ -63,6 +63,7 @@ public final class RemoveProcessor extends AbstractProcessor { } else { fields.forEach(document::removeField); } + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java index a35a164ddd3..2abd920048f 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java @@ -59,11 +59,11 @@ public final class RenameProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { String path = document.renderTemplate(field); if (document.hasField(path, true) == false) { if (ignoreMissing) { - return; + return document; } else { throw new IllegalArgumentException("field [" + path + "] doesn't exist"); } @@ -86,6 +86,7 @@ public final class RenameProcessor extends AbstractProcessor { document.setFieldValue(path, value); throw e; } + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java index 169b2ab646a..12ef53cdcfc 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java @@ -69,9 +69,10 @@ public final class ScriptProcessor extends AbstractProcessor { * @param document The Ingest document passed into the script context under the "ctx" object. */ @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { IngestScript.Factory factory = scriptService.compile(script, IngestScript.CONTEXT); factory.newInstance(script.getParams()).execute(document.getSourceAndMetadata()); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java index 7aefa288618..0af51e5b895 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java @@ -65,10 +65,11 @@ public final class SetProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) { document.setFieldValue(field, value); } + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java index 7ff266efe6b..a29cc346524 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java @@ -94,7 +94,7 @@ public final class SortProcessor extends AbstractProcessor { @Override @SuppressWarnings("unchecked") - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { List> list = document.getFieldValue(field, List.class); if (list == null) { @@ -110,6 +110,7 @@ public final class SortProcessor extends AbstractProcessor { } document.setFieldValue(targetField, copy); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java index cdd90f937fd..96a765b5ba7 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java @@ -68,11 +68,11 @@ public final class SplitProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { String oldVal = document.getFieldValue(field, String.class, ignoreMissing); if (oldVal == null && ignoreMissing) { - return; + return document; } else if (oldVal == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot split."); } @@ -81,6 +81,7 @@ public final class SplitProcessor extends AbstractProcessor { List splitList = new ArrayList<>(strings.length); Collections.addAll(splitList, strings); document.setFieldValue(targetField, splitList); + return document; } @Override diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index ffc5bcd4ac9..282994d8eb3 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -154,9 +154,10 @@ public class ForEachProcessorTests extends ESTestCase { public void testRandom() throws Exception { Processor innerProcessor = new Processor() { @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String existingValue = ingestDocument.getFieldValue("_ingest._value", String.class); ingestDocument.setFieldValue("_ingest._value", existingValue + "."); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java index 5baf3cf822d..3103fb0392e 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java @@ -45,8 +45,9 @@ public class PipelineProcessorTests extends ESTestCase { pipelineId, null, null, new CompoundProcessor(new Processor() { @Override - public void execute(final IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { invoked.complete(ingestDocument); + return ingestDocument; } @Override diff --git a/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java b/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java index 9fb2debcb54..c8a24ad3c87 100644 --- a/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java +++ b/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java @@ -73,13 +73,13 @@ public final class AttachmentProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { Map additionalFields = new HashMap<>(); byte[] input = ingestDocument.getFieldValueAsBytes(field, ignoreMissing); if (input == null && ignoreMissing) { - return; + return ingestDocument; } else if (input == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot parse."); } @@ -164,6 +164,7 @@ public final class AttachmentProcessor extends AbstractProcessor { } ingestDocument.setFieldValue(targetField, additionalFields); + return ingestDocument; } @Override diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 366b6ffc1d2..b5dbf5a7f34 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -81,11 +81,11 @@ public final class GeoIpProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing); if (ip == null && ignoreMissing) { - return; + return ingestDocument; } else if (ip == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information."); } @@ -120,6 +120,7 @@ public final class GeoIpProcessor extends AbstractProcessor { if (geoData.isEmpty() == false) { ingestDocument.setFieldValue(targetField, geoData); } + return ingestDocument; } @Override diff --git a/plugins/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java b/plugins/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java index 93f210c427b..6e7f588f0bd 100644 --- a/plugins/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java +++ b/plugins/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java @@ -63,11 +63,11 @@ public class UserAgentProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String userAgent = ingestDocument.getFieldValue(field, String.class, ignoreMissing); if (userAgent == null && ignoreMissing) { - return; + return ingestDocument; } else if (userAgent == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot parse user-agent."); } @@ -144,6 +144,7 @@ public class UserAgentProcessor extends AbstractProcessor { } ingestDocument.setFieldValue(targetField, uaDetails); + return ingestDocument; } /** To maintain compatibility with logstash-filter-useragent */ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index ea4a5086d7b..a3d7d50f3e2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -37,6 +38,7 @@ import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -521,28 +523,30 @@ public class TransportBulkAction extends HandledTransportAction listener) { long ingestStartTimeInNanos = System.nanoTime(); BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - ingestService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> { - logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", - indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); - bulkRequestModifier.markCurrentItemAsFailed(exception); - }, (exception) -> { - if (exception != null) { - logger.error("failed to execute pipeline for a bulk request", exception); - listener.onFailure(exception); - } else { - long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos); - BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); - ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener); - if (bulkRequest.requests().isEmpty()) { - // at this stage, the transport bulk action can't deal with a bulk request with no requests, - // so we stop and send an empty response back to the client. - // (this will happen if pre-processing all items in the bulk failed) - actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); + ingestService.executeBulkRequest(() -> bulkRequestModifier, + (indexRequest, exception) -> { + logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", + indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); + bulkRequestModifier.markCurrentItemAsFailed(exception); + }, (exception) -> { + if (exception != null) { + logger.error("failed to execute pipeline for a bulk request", exception); + listener.onFailure(exception); } else { - doExecute(task, bulkRequest, actionListener); + long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos); + BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); + ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener); + if (bulkRequest.requests().isEmpty()) { + // at this stage, the transport bulk action can't deal with a bulk request with no requests, + // so we stop and send an empty response back to the client. + // (this will happen if pre-processing all items in the bulk failed) + actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); + } else { + doExecute(task, bulkRequest, actionListener); + } } - } - }); + }, + indexRequest -> bulkRequestModifier.markCurrentItemAsDropped()); } static final class BulkRequestModifier implements Iterator> { @@ -604,6 +608,19 @@ public class TransportBulkAction extends HandledTransportAction responses = new ArrayList<>(); for (IngestDocument ingestDocument : request.getDocuments()) { - responses.add(executeDocument(request.getPipeline(), ingestDocument, request.isVerbose())); + SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()); + if (response != null) { + responses.add(response); + } } listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java index abf617ffb1a..04c0fe7ca49 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java @@ -42,7 +42,7 @@ public final class TrackingResultProcessor implements Processor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { try { actualProcessor.execute(ingestDocument); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); @@ -54,6 +54,7 @@ public final class TrackingResultProcessor implements Processor { } throw e; } + return ingestDocument; } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 3ab7c078cd7..f576667f441 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -94,17 +94,19 @@ public class CompoundProcessor implements Processor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { for (Processor processor : processors) { try { - processor.execute(ingestDocument); + if (processor.execute(ingestDocument) == null) { + return null; + } } catch (Exception e) { if (ignoreFailure) { continue; } ElasticsearchException compoundProcessorException = - newCompoundProcessorException(e, processor.getType(), processor.getTag()); + newCompoundProcessorException(e, processor.getType(), processor.getTag()); if (onFailureProcessors.isEmpty()) { throw compoundProcessorException; } else { @@ -113,6 +115,7 @@ public class CompoundProcessor implements Processor { } } } + return ingestDocument; } void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { @@ -149,7 +152,7 @@ public class CompoundProcessor implements Processor { } private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) { - if (e instanceof ElasticsearchException && ((ElasticsearchException)e).getHeader("processor_type") != null) { + if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) { return (ElasticsearchException) e; } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index d1eb651acae..b6f6612344a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -51,12 +51,13 @@ public class ConditionalProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { IngestConditionalScript script = scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { - processor.execute(ingestDocument); + return processor.execute(ingestDocument); } + return ingestDocument; } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index e218168eeb7..5f122358d0c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -644,11 +644,11 @@ public final class IngestDocument { * @param pipeline Pipeline to execute * @throws Exception On exception in pipeline execution */ - public void executePipeline(Pipeline pipeline) throws Exception { + public IngestDocument executePipeline(Pipeline pipeline) throws Exception { if (this.executedPipelines.add(pipeline) == false) { throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); } - pipeline.execute(this); + return pipeline.execute(this); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index f0f5d76caab..5623cf30f36 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -270,7 +270,7 @@ public class IngestService implements ClusterStateApplier { String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; Processor failureProcessor = new AbstractProcessor(tag) { @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { throw new IllegalStateException(errorMessage); } @@ -323,7 +323,8 @@ public class IngestService implements ClusterStateApplier { } public void executeBulkRequest(Iterable> actionRequests, - BiConsumer itemFailureHandler, Consumer completionHandler) { + BiConsumer itemFailureHandler, Consumer completionHandler, + Consumer itemDroppedHandler) { threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override @@ -351,7 +352,7 @@ public class IngestService implements ClusterStateApplier { if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } - innerExecute(indexRequest, pipeline); + innerExecute(indexRequest, pipeline, itemDroppedHandler); //this shouldn't be needed here but we do it for consistency with index api // which requires it to prevent double execution indexRequest.setPipeline(NOOP_PIPELINE_NAME); @@ -399,7 +400,7 @@ public class IngestService implements ClusterStateApplier { } } - private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception { + private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { if (pipeline.getProcessors().isEmpty()) { return; } @@ -419,20 +420,22 @@ public class IngestService implements ClusterStateApplier { VersionType versionType = indexRequest.versionType(); Map sourceAsMap = indexRequest.sourceAsMap(); IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); - pipeline.execute(ingestDocument); - - Map metadataMap = ingestDocument.extractMetadata(); - //it's fine to set all metadata fields all the time, as ingest document holds their starting values - //before ingestion, which might also get modified during ingestion. - indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); - indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); - indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); - indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); - indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); - if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { - indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); + if (pipeline.execute(ingestDocument) == null) { + itemDroppedHandler.accept(indexRequest); + } else { + Map metadataMap = ingestDocument.extractMetadata(); + //it's fine to set all metadata fields all the time, as ingest document holds their starting values + //before ingestion, which might also get modified during ingestion. + indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); + indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); + indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); + indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); + indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); + if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { + indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); + } + indexRequest.source(ingestDocument.getSourceAndMetadata()); } - indexRequest.source(ingestDocument.getSourceAndMetadata()); } catch (Exception e) { totalStats.ingestFailed(); pipelineStats.ifPresent(StatsHolder::ingestFailed); diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 0a8f9fbc0d8..9f13cb1280a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -77,8 +77,8 @@ public final class Pipeline { /** * Modifies the data of a document to be indexed based on the processor this pipeline holds */ - public void execute(IngestDocument ingestDocument) throws Exception { - compoundProcessor.execute(ingestDocument); + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + return compoundProcessor.execute(ingestDocument); } /** diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 15a26d37491..498ec3a7710 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -40,7 +40,7 @@ public interface Processor { /** * Introspect and potentially modify the incoming data. */ - void execute(IngestDocument ingestDocument) throws Exception; + IngestDocument execute(IngestDocument ingestDocument) throws Exception; /** * Gets the type of a processor diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 8b68d2b6bb9..7fdb12ff135 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -259,7 +259,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -293,7 +293,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -325,7 +325,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { action.execute(null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -369,7 +369,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { singleItemBulkWriteAction.execute(null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -417,7 +417,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -449,7 +449,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index 2cb13af7a28..12b4078ddf8 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -65,8 +65,9 @@ public class ConditionalProcessorTests extends ESTestCase { scriptName, Collections.emptyMap()), scriptService, new Processor() { @Override - public void execute(final IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { ingestDocument.setFieldValue("foo", "bar"); + return ingestDocument; } @Override diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 83a5bef4de2..e3f52f35b79 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -126,7 +126,7 @@ public class IngestServiceTests extends ESTestCase { @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(null); @@ -424,7 +424,7 @@ public class IngestServiceTests extends ESTestCase { IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> new AbstractProcessor("mock") { @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { throw new IllegalStateException("error"); } @@ -453,7 +453,7 @@ public class IngestServiceTests extends ESTestCase { @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(null); @@ -481,7 +481,7 @@ public class IngestServiceTests extends ESTestCase { BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler); + ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, times(1)).accept( argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { @Override @@ -514,7 +514,7 @@ public class IngestServiceTests extends ESTestCase { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null); } @@ -532,7 +532,7 @@ public class IngestServiceTests extends ESTestCase { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null); } @@ -560,14 +560,14 @@ public class IngestServiceTests extends ESTestCase { ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); } } - return null; + return ingestDocument; }).when(processor).execute(any()); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(processor).execute(any()); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null); @@ -597,7 +597,7 @@ public class IngestServiceTests extends ESTestCase { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(null); @@ -624,7 +624,7 @@ public class IngestServiceTests extends ESTestCase { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); verify(completionHandler, times(1)).accept(null); } @@ -661,7 +661,7 @@ public class IngestServiceTests extends ESTestCase { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(null); @@ -707,7 +707,7 @@ public class IngestServiceTests extends ESTestCase { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher() { @Override @@ -741,7 +741,7 @@ public class IngestServiceTests extends ESTestCase { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); verify(requestItemErrorHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null); @@ -779,7 +779,7 @@ public class IngestServiceTests extends ESTestCase { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); @@ -787,7 +787,7 @@ public class IngestServiceTests extends ESTestCase { assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); @@ -827,8 +827,9 @@ public class IngestServiceTests extends ESTestCase { String value = (String) config.remove("value"); return new Processor() { @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { ingestDocument.setFieldValue(field, value); + return ingestDocument; } @Override @@ -846,8 +847,9 @@ public class IngestServiceTests extends ESTestCase { String field = (String) config.remove("field"); return new Processor() { @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { ingestDocument.removeField(field); + return ingestDocument; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java b/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java index 4e4c5a24c0c..a1feb3e1f73 100644 --- a/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java +++ b/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java @@ -45,9 +45,10 @@ public class TestProcessor implements Processor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { invokedCounter.incrementAndGet(); ingestDocumentConsumer.accept(ingestDocument); + return ingestDocument; } @Override diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockIngestPlugin.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockIngestPlugin.java index 818ab374d34..b4521ad58b2 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockIngestPlugin.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockIngestPlugin.java @@ -74,8 +74,9 @@ public class MockIngestPlugin extends Plugin implements IngestPlugin { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { // mock processor does nothing + return ingestDocument; } @Override diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/SetSecurityUserProcessor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/SetSecurityUserProcessor.java index 15ac88b4d94..0c30af1879c 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/SetSecurityUserProcessor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/SetSecurityUserProcessor.java @@ -43,7 +43,7 @@ public final class SetSecurityUserProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { Authentication authentication = Authentication.getAuthentication(threadContext); if (authentication == null) { throw new IllegalStateException("No user authenticated, only use this processor via authenticated user"); @@ -86,6 +86,7 @@ public final class SetSecurityUserProcessor extends AbstractProcessor { } } ingestDocument.setFieldValue(field, userObject); + return ingestDocument; } @Override