diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 009a67699e3..29ff0399509 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -1075,9 +1075,10 @@ then it aborts the execution and leaves the array unmodified. .Foreach Options [options="header"] |====== -| Name | Required | Default | Description -| `field` | yes | - | The array field -| `processor` | yes | - | The processor to execute against each field +| Name | Required | Default | Description +| `field` | yes | - | The array field +| `processor` | yes | - | The processor to execute against each field +| `ignore_missing` | no | false | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document |====== Assume the following document: diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 2a1046acb9c..1c64fdb7408 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; +import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; import static org.elasticsearch.ingest.ConfigurationUtils.readMap; import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty; @@ -47,16 +48,28 @@ public final class ForEachProcessor extends AbstractProcessor { private final String field; private final Processor processor; + private final boolean ignoreMissing; - ForEachProcessor(String tag, String field, Processor processor) { + ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) { super(tag); this.field = field; this.processor = processor; + this.ignoreMissing = ignoreMissing; + } + + boolean isIgnoreMissing() { + return ignoreMissing; } @Override public void execute(IngestDocument ingestDocument) throws Exception { - List values = ingestDocument.getFieldValue(field, List.class); + List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing); + if (values == null) { + if (ignoreMissing) { + return; + } + throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."); + } List newValues = new ArrayList<>(values.size()); for (Object value : values) { Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); @@ -87,6 +100,7 @@ public final class ForEachProcessor extends AbstractProcessor { public ForEachProcessor create(Map factories, String tag, Map config) throws Exception { String field = readStringProperty(TYPE, tag, config, "field"); + boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false); Map> processorConfig = readMap(TYPE, tag, config, "processor"); Set>> entries = processorConfig.entrySet(); if (entries.size() != 1) { @@ -94,7 +108,7 @@ public final class ForEachProcessor extends AbstractProcessor { } Map.Entry> entry = entries.iterator().next(); Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue()); - return new ForEachProcessor(tag, field, processor); + return new ForEachProcessor(tag, field, processor, ignoreMissing); } } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 49611d76f40..f382ad8dcfb 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -46,6 +46,24 @@ public class ForEachProcessorFactoryTests extends ESTestCase { assertThat(forEachProcessor, Matchers.notNullValue()); assertThat(forEachProcessor.getField(), equalTo("_field")); assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor)); + assertFalse(forEachProcessor.isIgnoreMissing()); + } + + public void testSetIgnoreMissing() throws Exception { + Processor processor = new TestProcessor(ingestDocument -> { }); + Map registry = new HashMap<>(); + registry.put("_name", (r, t, c) -> processor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + + Map config = new HashMap<>(); + config.put("field", "_field"); + config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); + config.put("ignore_missing", true); + ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config); + assertThat(forEachProcessor, Matchers.notNullValue()); + assertThat(forEachProcessor.getField(), equalTo("_field")); + assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor)); + assertTrue(forEachProcessor.isIgnoreMissing()); } public void testCreateWithTooManyProcessorTypes() throws Exception { diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 07573a780a1..1491bd481bd 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -19,6 +19,13 @@ package org.elasticsearch.ingest.common; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -27,14 +34,7 @@ import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; - +import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; public class ForEachProcessorTests extends ESTestCase { @@ -49,7 +49,8 @@ public class ForEachProcessorTests extends ESTestCase { ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value") + "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), + false ); processor.execute(ingestDocument); @@ -69,7 +70,7 @@ public class ForEachProcessorTests extends ESTestCase { throw new RuntimeException("failure"); } }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); try { processor.execute(ingestDocument); fail("exception expected"); @@ -89,7 +90,8 @@ public class ForEachProcessorTests extends ESTestCase { }); Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( - "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)) + "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), + false ); processor.execute(ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); @@ -109,7 +111,7 @@ public class ForEachProcessorTests extends ESTestCase { id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type")); id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); @@ -137,7 +139,7 @@ public class ForEachProcessorTests extends ESTestCase { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), - (model) -> model.get("other"))); + (model) -> model.get("other")), false); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); @@ -174,7 +176,7 @@ public class ForEachProcessorTests extends ESTestCase { "_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values) ); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); @@ -199,7 +201,7 @@ public class ForEachProcessorTests extends ESTestCase { "_tag", "values", new CompoundProcessor(false, Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) - )); + ), false); processor.execute(ingestDocument); List result = ingestDocument.getFieldValue("values", List.class); @@ -225,7 +227,7 @@ public class ForEachProcessorTests extends ESTestCase { TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); - ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor); + ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); forEachProcessor.execute(ingestDocument); List result = ingestDocument.getFieldValue("values", List.class); @@ -258,7 +260,7 @@ public class ForEachProcessorTests extends ESTestCase { doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH)) ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor)); + "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false); processor.execute(ingestDocument); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); @@ -270,4 +272,16 @@ public class ForEachProcessorTests extends ESTestCase { assertThat(result.get(1), equalTo("JKL")); } + public void testIgnoreMissing() throws Exception { + IngestDocument originalIngestDocument = new IngestDocument( + "_index", "_type", "_id", null, null, null, Collections.emptyMap() + ); + IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); + TestProcessor testProcessor = new TestProcessor(doc -> {}); + ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); + processor.execute(ingestDocument); + assertIngestDocument(originalIngestDocument, ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(0)); + } + }