diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 6432ccb3ddd..c77ef0f9d5b 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -1037,11 +1037,6 @@ - - - - - diff --git a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java index fb10f8a1927..652a2c184a2 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java @@ -38,19 +38,25 @@ public class CompoundProcessor implements Processor { public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type"; public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag"; + private final boolean ignoreFailure; private final List processors; private final List onFailureProcessors; public CompoundProcessor(Processor... processor) { - this(Arrays.asList(processor), Collections.emptyList()); + this(false, Arrays.asList(processor), Collections.emptyList()); } - public CompoundProcessor(List processors, List onFailureProcessors) { + public CompoundProcessor(boolean ignoreFailure, List processors, List onFailureProcessors) { super(); + this.ignoreFailure = ignoreFailure; this.processors = processors; this.onFailureProcessors = onFailureProcessors; } + public boolean isIgnoreFailure() { + return ignoreFailure; + } + public List getOnFailureProcessors() { return onFailureProcessors; } @@ -93,6 +99,10 @@ public class CompoundProcessor implements Processor { try { processor.execute(ingestDocument); } catch (Exception e) { + if (ignoreFailure) { + continue; + } + ElasticsearchException compoundProcessorException = newCompoundProcessorException(e, processor.getType(), processor.getTag()); if (onFailureProcessors.isEmpty()) { throw compoundProcessorException; diff --git a/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java index 685f6f41de3..20370430bdc 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java @@ -250,6 +250,7 @@ public final class ConfigurationUtils { private static Processor readProcessor(ProcessorsRegistry processorRegistry, String type, Map config) throws Exception { Processor.Factory factory = processorRegistry.getProcessorFactory(type); if (factory != null) { + boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false); List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY); @@ -260,10 +261,11 @@ public final class ConfigurationUtils { throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}", type, Arrays.toString(config.keySet().toArray())); } - if (onFailureProcessors.isEmpty()) { + if (onFailureProcessors.size() > 0 || ignoreFailure) { + return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors); + } else { return processor; } - return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors); } throw new ElasticsearchParseException("No processor type exists with name [" + type + "]"); } diff --git a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java index aaae929e0a9..d0b1ca7f6cf 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java @@ -109,7 +109,7 @@ public final class Pipeline { if (config.isEmpty() == false) { throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); } - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); return new Pipeline(id, description, compoundProcessor); } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java index af820318d83..861b0d68d2d 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java @@ -83,7 +83,7 @@ public final class TrackingResultProcessor implements Processor { onFailureProcessors.add(new TrackingResultProcessor(processor, processorResultList)); } } - return new CompoundProcessor(processors, onFailureProcessors); + return new CompoundProcessor(false, processors, onFailureProcessors); } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 30c29f8db5d..f7f2f0212dd 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -128,7 +128,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {}); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", - new CompoundProcessor(new CompoundProcessor(Collections.singletonList(processor1), + new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2)), processor3)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); assertThat(processor1.getInvokedCounter(), equalTo(1)); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 254057d2ede..1b96939eb38 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -98,7 +98,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); bulkRequest.add(indexRequest1); - IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); + IndexRequest indexRequest2 = + new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); bulkRequest.add(indexRequest2); @SuppressWarnings("unchecked") BiConsumer failureHandler = mock(BiConsumer.class); @@ -192,7 +193,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { when(processor.getType()).thenReturn("mock_processor_type"); when(processor.getTag()).thenReturn("mock_processor_tag"); Processor onFailureProcessor = mock(Processor.class); - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), + Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); @@ -208,7 +210,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecuteFailureWithOnFailure() throws Exception { Processor processor = mock(Processor.class); Processor onFailureProcessor = mock(Processor.class); - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), + Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); @@ -227,8 +230,9 @@ public class PipelineExecutionServiceTests extends ESTestCase { Processor processor = mock(Processor.class); Processor onFailureProcessor = mock(Processor.class); Processor onFailureOnFailureProcessor = mock(Processor.class); - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), - Collections.singletonList(new CompoundProcessor(Collections.singletonList(onFailureProcessor), Collections.singletonList(onFailureOnFailureProcessor)))); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), + Collections.singletonList(new CompoundProcessor(false, Collections.singletonList(onFailureProcessor), + Collections.singletonList(onFailureOnFailureProcessor)))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index 10321e30301..c60bb3eecde 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -61,7 +61,8 @@ public class PipelineStoreTests extends ESTestCase { store = new PipelineStore(Settings.EMPTY); ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder(); registryBuilder.registerProcessor("set", (templateService, registry) -> new SetProcessor.Factory(TestTemplateService.instance())); - registryBuilder.registerProcessor("remove", (templateService, registry) -> new RemoveProcessor.Factory(TestTemplateService.instance())); + registryBuilder.registerProcessor("remove", (templateService, registry) -> + new RemoveProcessor.Factory(TestTemplateService.instance())); store.buildProcessorFactoryRegistry(registryBuilder, null); } @@ -190,7 +191,8 @@ public class PipelineStoreTests extends ESTestCase { assertThat(pipeline, nullValue()); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty - PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")); + PutPipelineRequest putRequest = new PutPipelineRequest(id, + new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")); clusterState = store.innerPut(putRequest, clusterState); store.innerUpdatePipelines(clusterState); pipeline = store.get(id); @@ -208,7 +210,8 @@ public class PipelineStoreTests extends ESTestCase { } public void testValidate() throws Exception { - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}},{\"remove\" : {\"field\": \"_field\"}}]}")); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}},{\"remove\" : {\"field\": \"_field\"}}]}")); DiscoveryNode node1 = new DiscoveryNode("_node_id1", new LocalTransportAddress("_id"), emptyMap(), emptySet(), Version.CURRENT); @@ -230,7 +233,8 @@ public class PipelineStoreTests extends ESTestCase { } public void testValidateNoIngestInfo() throws Exception { - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")); try { store.validatePipeline(Collections.emptyMap(), putRequest); fail("exception expected"); diff --git a/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java index b4ee7eca07c..0e110288597 100644 --- a/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -31,6 +32,7 @@ import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; public class CompoundProcessorTests extends ESTestCase { private IngestDocument ingestDocument; @@ -51,7 +53,7 @@ public class CompoundProcessorTests extends ESTestCase { TestProcessor processor = new TestProcessor(ingestDocument -> {}); CompoundProcessor compoundProcessor = new CompoundProcessor(processor); assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); - assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); + assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); compoundProcessor.execute(ingestDocument); assertThat(processor.getInvokedCounter(), equalTo(1)); @@ -61,7 +63,7 @@ public class CompoundProcessorTests extends ESTestCase { TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); CompoundProcessor compoundProcessor = new CompoundProcessor(processor); assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); - assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); + assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); try { compoundProcessor.execute(ingestDocument); @@ -72,6 +74,16 @@ public class CompoundProcessorTests extends ESTestCase { assertThat(processor.getInvokedCounter(), equalTo(1)); } + public void testIgnoreFailure() throws Exception { + TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); + TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");}); + CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList()); + compoundProcessor.execute(ingestDocument); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertThat(processor2.getInvokedCounter(), equalTo(1)); + assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value")); + } + public void testSingleProcessorWithOnFailureProcessor() throws Exception { TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor2 = new TestProcessor(ingestDocument -> { @@ -82,7 +94,8 @@ public class CompoundProcessorTests extends ESTestCase { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); }); - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2)); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), + Collections.singletonList(processor2)); compoundProcessor.execute(ingestDocument); assertThat(processor1.getInvokedCounter(), equalTo(1)); @@ -106,8 +119,10 @@ public class CompoundProcessorTests extends ESTestCase { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2")); }); - CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(Collections.singletonList(processorToFail), Collections.singletonList(lastProcessor)); - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor)); + CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail), + Collections.singletonList(lastProcessor)); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), + Collections.singletonList(compoundOnFailProcessor)); compoundProcessor.execute(ingestDocument); assertThat(processorToFail.getInvokedCounter(), equalTo(1)); @@ -126,7 +141,7 @@ public class CompoundProcessorTests extends ESTestCase { CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor); - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor), + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor)); compoundProcessor.execute(ingestDocument); @@ -136,7 +151,8 @@ public class CompoundProcessorTests extends ESTestCase { public void testCompoundProcessorExceptionFail() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); - TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + TestProcessor failProcessor = + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -145,10 +161,10 @@ public class CompoundProcessorTests extends ESTestCase { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); - CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor), + CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), Collections.singletonList(failProcessor)); - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor), + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor)); compoundProcessor.execute(ingestDocument); @@ -158,7 +174,8 @@ public class CompoundProcessorTests extends ESTestCase { public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); - TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + TestProcessor failProcessor = + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -167,10 +184,10 @@ public class CompoundProcessorTests extends ESTestCase { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); - CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor), + CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), Collections.singletonList(new CompoundProcessor(failProcessor))); - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor), + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor)); compoundProcessor.execute(ingestDocument); diff --git a/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java index 537d8f020e6..449f1836f2b 100644 --- a/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.prefs.PreferencesFactory; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; public class PipelineFactoryTests extends ESTestCase { @@ -45,7 +46,8 @@ public class PipelineFactoryTests extends ESTestCase { processorConfig0.put(AbstractProcessorFactory.TAG_KEY, "first-processor"); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); - pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); + pipelineConfig.put(Pipeline.PROCESSORS_KEY, + Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); Pipeline.Factory factory = new Pipeline.Factory(); ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); @@ -87,6 +89,28 @@ public class PipelineFactoryTests extends ESTestCase { assertThat(pipeline.getOnFailureProcessors().get(0).getType(), equalTo("test-processor")); } + public void testCreateWithPipelineIgnoreFailure() throws Exception { + Map processorConfig = new HashMap<>(); + processorConfig.put("ignore_failure", true); + + ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); + Pipeline.Factory factory = new Pipeline.Factory(); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.PROCESSORS_KEY, + Collections.singletonList(Collections.singletonMap("test", processorConfig))); + + Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertThat(pipeline.getOnFailureProcessors().size(), equalTo(0)); + + CompoundProcessor processor = (CompoundProcessor) pipeline.getProcessors().get(0); + assertThat(processor.isIgnoreFailure(), is(true)); + assertThat(processor.getProcessors().get(0).getType(), equalTo("test-processor")); + } + public void testCreateUnusedProcessorOptions() throws Exception { Map processorConfig = new HashMap<>(); processorConfig.put("unused", "value"); @@ -121,7 +145,8 @@ public class PipelineFactoryTests extends ESTestCase { public void testFlattenProcessors() throws Exception { TestProcessor testProcessor = new TestProcessor(ingestDocument -> {}); CompoundProcessor processor1 = new CompoundProcessor(testProcessor, testProcessor); - CompoundProcessor processor2 = new CompoundProcessor(Collections.singletonList(testProcessor), Collections.singletonList(testProcessor)); + CompoundProcessor processor2 = + new CompoundProcessor(false, Collections.singletonList(testProcessor), Collections.singletonList(testProcessor)); Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2)); List flattened = pipeline.flattenAllProcessors(); assertThat(flattened.size(), equalTo(4)); diff --git a/core/src/test/java/org/elasticsearch/ingest/core/ValueSourceTests.java b/core/src/test/java/org/elasticsearch/ingest/core/ValueSourceTests.java index f2aa9f32bcd..57a6eb7ee68 100644 --- a/core/src/test/java/org/elasticsearch/ingest/core/ValueSourceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/core/ValueSourceTests.java @@ -51,7 +51,8 @@ public class ValueSourceTests extends ESTestCase { myPreciousMap.put("field2", "value2"); IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); - ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), ValueSource.wrap(myPreciousMap, TestTemplateService.instance())); + ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), + ValueSource.wrap(myPreciousMap, TestTemplateService.instance())); ingestDocument.removeField("field1.field2"); assertThat(myPreciousMap.size(), equalTo(1)); @@ -63,7 +64,8 @@ public class ValueSourceTests extends ESTestCase { myPreciousList.add("value"); IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); - ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), ValueSource.wrap(myPreciousList, TestTemplateService.instance())); + ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), + ValueSource.wrap(myPreciousList, TestTemplateService.instance())); ingestDocument.removeField("field1.0"); assertThat(myPreciousList.size(), equalTo(1)); diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorTests.java index 9f76a32e0c2..d1c2c6f1777 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorTests.java @@ -91,7 +91,7 @@ public class ForEachProcessorTests extends ESTestCase { Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( "_tag", "values", - Collections.singletonList(new CompoundProcessor(Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))) + Collections.singletonList(new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))) ); processor.execute(ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java index 2b53a9d08bc..e19a4df05c9 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java @@ -91,8 +91,8 @@ public class TrackingResultProcessorTests extends ESTestCase { RuntimeException exception = new RuntimeException("fail"); TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); - CompoundProcessor actualProcessor = new CompoundProcessor( - Arrays.asList(new CompoundProcessor( + CompoundProcessor actualProcessor = new CompoundProcessor(false, + Arrays.asList(new CompoundProcessor(false, Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor)); diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 54a10f766fd..9cf51374ab4 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -566,6 +566,30 @@ the index to which failed documents get sent. } -------------------------------------------------- +Alternatively instead of defining behaviour in case of processor failure, it is also possible +to ignore a failure and continue with the next processor by specifying the `ignore_failure` setting. + +In case in the example below the field `foo` doesn't exist the failure will be caught and the pipeline +continues to execute, which in this case means that the pipeline does nothing. + +[source,js] +-------------------------------------------------- +{ + "description" : "my first pipeline with handled exceptions", + "processors" : [ + { + "rename" : { + "field" : "foo", + "target_field" : "bar", + "ignore_failure" : true + } + } + ] +} +-------------------------------------------------- + +The `ignore_failure` can be set on any processor and defaults to `false`. + [float] [[accessing-error-metadata]] === Accessing Error Metadata From Processors Handling Exceptions diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/50_on_failure.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/50_on_failure.yaml index b0c6c53fe67..bc0a765e2c0 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/50_on_failure.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/50_on_failure.yaml @@ -106,3 +106,43 @@ - match: { _source.field1: "value1" } - match: { _source.foofield: "exists" } - match: { _source.foofield2: "ran" } + +--- +"Test pipeline with ignore_failure in a processor": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "fail" : { + "message" : "_message", + "ignore_failure": true + } + }, + { + "set" : { + "field" : "field", + "value" : "value" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.field: "value" }