diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java index 7c92e4bbd53..b0e0a2a66a8 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -22,6 +22,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ingest.processor.ConfigurationUtils; import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.processor.CompoundProcessor; import java.util.ArrayList; import java.util.Arrays; @@ -36,21 +37,19 @@ public final class Pipeline { private final String id; private final String description; - private final List processors; + private final CompoundProcessor compoundProcessor; - public Pipeline(String id, String description, List processors) { + public Pipeline(String id, String description, CompoundProcessor compoundProcessor) { this.id = id; this.description = description; - this.processors = processors; + this.compoundProcessor = compoundProcessor; } /** * Modifies the data of a document to be indexed based on the processor this pipeline holds */ public void execute(IngestDocument ingestDocument) throws Exception { - for (Processor processor : processors) { - processor.execute(ingestDocument); - } + compoundProcessor.execute(ingestDocument); } /** @@ -71,33 +70,56 @@ public final class Pipeline { * Unmodifiable list containing each processor that operates on the data. */ public List getProcessors() { - return processors; + return compoundProcessor.getProcessors(); + } + + /** + * Unmodifiable list containing each on_failure processor that operates on the data in case of + * exception thrown in pipeline processors + */ + public List getOnFailureProcessors() { + return compoundProcessor.getOnFailureProcessors(); } public final static class Factory { + private Processor readProcessor(Map processorRegistry, String type, Map config) throws Exception { + Processor.Factory factory = processorRegistry.get(type); + if (factory != null) { + List onFailureProcessors = readProcessors("on_failure", processorRegistry, config); + Processor processor = factory.create(config); + if (config.isEmpty() == false) { + throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); + } + if (onFailureProcessors.isEmpty()) { + return processor; + } else { + return new CompoundProcessor(Arrays.asList(processor), onFailureProcessors); + } + } else { + throw new IllegalArgumentException("No processor type exist with name [" + type + "]"); + } + } - public Pipeline create(String id, Map config, Map processorRegistry) throws Exception { - String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); - List processors = new ArrayList<>(); - @SuppressWarnings("unchecked") - List>> processorConfigs = (List>>) config.get("processors"); - if (processorConfigs != null ) { - for (Map> processor : processorConfigs) { - for (Map.Entry> entry : processor.entrySet()) { - Processor.Factory factory = processorRegistry.get(entry.getKey()); - if (factory != null) { - Map processorConfig = entry.getValue(); - processors.add(factory.create(processorConfig)); - if (processorConfig.isEmpty() == false) { - throw new IllegalArgumentException("processor [" + entry.getKey() + "] doesn't support one or more provided configuration parameters " + Arrays.toString(processorConfig.keySet().toArray())); - } - } else { - throw new IllegalArgumentException("No processor type exist with name [" + entry.getKey() + "]"); - } + private List readProcessors(String fieldName, Map processorRegistry, Map config) throws Exception { + List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(config, fieldName); + List onFailureProcessors = new ArrayList<>(); + if (onFailureProcessorConfigs != null) { + for (Map> processorConfigWithKey : onFailureProcessorConfigs) { + for (Map.Entry> entry : processorConfigWithKey.entrySet()) { + onFailureProcessors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue())); } } } - return new Pipeline(id, description, Collections.unmodifiableList(processors)); + + return onFailureProcessors; + } + + public Pipeline create(String id, Map config, Map processorRegistry) throws Exception { + String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); + List processors = readProcessors("processors", processorRegistry, config); + List onFailureProcessors = readProcessors("on_failure", processorRegistry, config); + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); + return new Pipeline(id, description, compoundProcessor); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/CompoundProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/CompoundProcessor.java new file mode 100644 index 00000000000..dbaad2e2313 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/CompoundProcessor.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.ingest.IngestDocument; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A Processor that executes a list of other "processors". It executes a separate list of + * "onFailureProcessors" when any of the processors throw an {@link Exception}. + */ +public class CompoundProcessor implements Processor { + + private final List processors; + private final List onFailureProcessors; + + public CompoundProcessor(Processor... processor) { + this(Arrays.asList(processor), Collections.emptyList()); + } + public CompoundProcessor(List processors, List onFailureProcessors) { + this.processors = processors; + this.onFailureProcessors = onFailureProcessors; + } + + public List getOnFailureProcessors() { + return onFailureProcessors; + } + + public List getProcessors() { + return processors; + } + + @Override + public String getType() { + return "compound[" + processors.stream().map(p -> p.getType()).collect(Collectors.joining(",")) + "]"; + } + + @Override + public void execute(IngestDocument ingestDocument) throws Exception { + try { + for (Processor processor : processors) { + processor.execute(ingestDocument); + } + } catch (Exception e) { + if (onFailureProcessors.isEmpty()) { + throw e; + } else { + executeOnFailure(ingestDocument); + } + } + + } + + void executeOnFailure(IngestDocument ingestDocument) throws Exception { + for (Processor processor : onFailureProcessors) { + processor.execute(ingestDocument); + } + } +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index 459f7a62869..5d61f11ac24 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -52,6 +52,29 @@ public class PipelineFactoryTests extends ESTestCase { assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor")); } + public void testCreateWithPipelineOnFailure() throws Exception { + Map processorConfig = new HashMap<>(); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("description", "_description"); + pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig))); + pipelineConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig))); + Pipeline.Factory factory = new Pipeline.Factory(); + Map processorRegistry = new HashMap<>(); + Processor processor = mock(Processor.class); + when(processor.getType()).thenReturn("test-processor"); + Processor.Factory processorFactory = mock(Processor.Factory.class); + when(processorFactory.create(processorConfig)).thenReturn(processor); + processorRegistry.put("test-processor", processorFactory); + + Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor")); + assertThat(pipeline.getOnFailureProcessors().size(), equalTo(1)); + assertThat(pipeline.getOnFailureProcessors().get(0).getType(), equalTo("test-processor")); + } + public void testCreateUnusedProcessorOptions() throws Exception { Map processorConfig = new HashMap<>(); processorConfig.put("unused", "value"); @@ -71,4 +94,26 @@ public class PipelineFactoryTests extends ESTestCase { assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } } + + public void testCreateProcessorsWithOnFailureProperties() throws Exception { + Map processorConfig = new HashMap<>(); + processorConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test", new HashMap<>()))); + + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("description", "_description"); + pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig))); + Pipeline.Factory factory = new Pipeline.Factory(); + Map processorFactoryStore = new HashMap<>(); + Processor processor = mock(Processor.class); + when(processor.getType()).thenReturn("test-processor"); + Processor.Factory processorFactory = mock(Processor.Factory.class); + when(processorFactory.create(processorConfig)).thenReturn(processor); + processorFactoryStore.put("test", processorFactory); + + Pipeline pipeline = factory.create("_id", pipelineConfig, processorFactoryStore); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound[test-processor]")); + } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/CompoundProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/CompoundProcessorTests.java new file mode 100644 index 00000000000..efb49e4c6dd --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/CompoundProcessorTests.java @@ -0,0 +1,117 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.Arrays; +import java.util.HashMap; + +import static org.elasticsearch.mock.orig.Mockito.verify; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; + +public class CompoundProcessorTests extends ESTestCase { + private IngestDocument ingestDocument; + + @Before + public void init() { + ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); + } + + public void testEmpty() throws Exception { + CompoundProcessor processor = new CompoundProcessor(); + assertThat(processor.getProcessors().isEmpty(), is(true)); + assertThat(processor.getOnFailureProcessors().isEmpty(), is(true)); + processor.execute(ingestDocument); + } + + public void testSingleProcessor() throws Exception { + Processor processor = mock(Processor.class); + CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); + assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); + assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); + compoundProcessor.execute(ingestDocument); + verify(processor, times(1)).execute(ingestDocument); + } + + public void testSingleProcessorWithException() throws Exception { + Processor processor = mock(Processor.class); + doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument); + CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); + assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); + assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); + try { + compoundProcessor.execute(ingestDocument); + fail("should throw exception"); + } catch (Exception e) { + assertThat(e.getMessage(), equalTo("error")); + } + verify(processor, times(1)).execute(ingestDocument); + } + + public void testSingleProcessorWithOnFailureProcessor() throws Exception { + Processor processor = mock(Processor.class); + doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument); + Processor processorNext = mock(Processor.class); + CompoundProcessor compoundProcessor = new CompoundProcessor(Arrays.asList(processor), Arrays.asList(processorNext)); + assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); + assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); + assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1)); + assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(processorNext)); + compoundProcessor.execute(ingestDocument); + verify(processor, times(1)).execute(ingestDocument); + verify(processorNext, times(1)).execute(ingestDocument); + } + + public void testSingleProcessorWithNestedFailures() throws Exception { + Processor processor = mock(Processor.class); + doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument); + Processor processorToFail = mock(Processor.class); + doThrow(new RuntimeException("error")).doNothing().when(processorToFail).execute(ingestDocument); + Processor lastProcessor = mock(Processor.class); + + CompoundProcessor innerCompoundOnFailProcessor = new CompoundProcessor(Arrays.asList(processorToFail), Arrays.asList(lastProcessor)); + CompoundProcessor compoundOnFailProcessor = spy(innerCompoundOnFailProcessor); + + CompoundProcessor innerCompoundProcessor = new CompoundProcessor(Arrays.asList(processor), Arrays.asList(compoundOnFailProcessor)); + CompoundProcessor compoundProcessor = spy(innerCompoundProcessor); + + assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); + assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); + assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1)); + assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(compoundOnFailProcessor)); + compoundProcessor.execute(ingestDocument); + verify(processor, times(1)).execute(ingestDocument); + verify(compoundProcessor, times(1)).executeOnFailure(ingestDocument); + verify(compoundOnFailProcessor, times(1)).execute(ingestDocument); + verify(processorToFail, times(1)).execute(ingestDocument); + verify(compoundOnFailProcessor, times(1)).executeOnFailure(ingestDocument); + verify(lastProcessor, times(1)).execute(ingestDocument); + } +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java index 961d820b876..7140bf53a8d 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.TestTemplateService; +import org.elasticsearch.ingest.processor.CompoundProcessor; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.set.SetProcessor; import org.elasticsearch.test.ESTestCase; @@ -38,6 +39,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -85,8 +87,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { } public void testExecuteSuccess() throws Exception { - Processor processor = mock(Processor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); + CompoundProcessor processor = mock(CompoundProcessor.class); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); Consumer failureHandler = mock(Consumer.class); @@ -99,7 +101,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { } public void testExecutePropagateAllMetaDataUpdates() throws Exception { - Processor processor = mock(Processor.class); + CompoundProcessor processor = mock(CompoundProcessor.class); doAnswer((InvocationOnMock invocationOnMock) -> { IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { @@ -112,7 +114,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { } return null; }).when(processor).execute(any()); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); Consumer failureHandler = mock(Consumer.class); @@ -132,8 +134,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { } public void testExecuteFailure() throws Exception { - Processor processor = mock(Processor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); + CompoundProcessor processor = mock(CompoundProcessor.class); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); Consumer failureHandler = mock(Consumer.class); @@ -144,6 +146,57 @@ public class PipelineExecutionServiceTests extends ESTestCase { verify(completionHandler, never()).accept(anyBoolean()); } + public void testExecuteSuccessWithOnFailure() throws Exception { + Processor processor = mock(Processor.class); + Processor onFailureProcessor = mock(Processor.class); + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + Consumer failureHandler = mock(Consumer.class); + Consumer completionHandler = mock(Consumer.class); + executionService.execute(indexRequest, "_id", failureHandler, completionHandler); + //TODO we remove metadata, this check is not valid anymore, what do we replace it with? + //verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(failureHandler, never()).accept(any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(true); + } + + public void testExecuteFailureWithOnFailure() throws Exception { + Processor processor = mock(Processor.class); + Processor onFailureProcessor = mock(Processor.class); + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + Consumer failureHandler = mock(Consumer.class); + Consumer completionHandler = mock(Consumer.class); + executionService.execute(indexRequest, "_id", failureHandler, completionHandler); + verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(failureHandler, times(1)).accept(any(RuntimeException.class)); + verify(completionHandler, never()).accept(anyBoolean()); + } + + public void testExecuteFailureWithNestedOnFailure() throws Exception { + Processor processor = mock(Processor.class); + Processor onFailureProcessor = mock(Processor.class); + Processor onFailureOnFailureProcessor = mock(Processor.class); + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), + Collections.singletonList(new CompoundProcessor(Arrays.asList(onFailureProcessor),Arrays.asList(onFailureOnFailureProcessor)))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); + doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + Consumer failureHandler = mock(Consumer.class); + Consumer completionHandler = mock(Consumer.class); + executionService.execute(indexRequest, "_id", failureHandler, completionHandler); + verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(failureHandler, times(1)).accept(any(RuntimeException.class)); + verify(completionHandler, never()).accept(anyBoolean()); + } + @SuppressWarnings("unchecked") public void testExecuteTTL() throws Exception { // test with valid ttl @@ -152,7 +205,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { config.put("field", "_ttl"); config.put("value", "5d"); Processor processor = metaProcessorFactory.create(config); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); Consumer failureHandler = mock(Consumer.class); @@ -169,7 +222,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { config.put("field", "_ttl"); config.put("value", "abc"); processor = metaProcessorFactory.create(config); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); failureHandler = mock(Consumer.class); @@ -179,7 +232,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { verify(completionHandler, never()).accept(anyBoolean()); // test with provided ttl - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.emptyList())); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", mock(CompoundProcessor.class))); indexRequest = new IndexRequest("_index", "_type", "_id") .source(Collections.emptyMap()) @@ -217,10 +270,10 @@ public class PipelineExecutionServiceTests extends ESTestCase { String pipelineId = "_id"; - Processor pipeline = mock(Processor.class); + CompoundProcessor processor = mock(CompoundProcessor.class); Exception error = new RuntimeException(); - doThrow(error).when(pipeline).execute(any()); - when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, Collections.singletonList(pipeline))); + doThrow(error).when(processor).execute(any()); + when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor)); Consumer requestItemErrorHandler = mock(Consumer.class); Consumer completionHandler = mock(Consumer.class); @@ -241,7 +294,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { } String pipelineId = "_id"; - when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, Collections.emptyList())); + when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, new CompoundProcessor())); Consumer requestItemErrorHandler = mock(Consumer.class); Consumer completionHandler = mock(Consumer.class); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index 6baf588d0d1..2bb44b80868 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.processor.CompoundProcessor; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.plugin.ingest.IngestBootstrapper; import org.elasticsearch.plugin.ingest.IngestPlugin; @@ -186,7 +187,7 @@ public class IngestActionFilterTests extends ESTestCase { return null; } }; - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); executionService = new PipelineExecutionService(store, threadPool); IngestBootstrapper bootstrapper = mock(IngestBootstrapper.class); when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java index a60b8945f10..c4d12b23a47 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.processor.CompoundProcessor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; @@ -47,7 +47,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { private ThreadPool threadPool; private SimulateExecutionService executionService; private Pipeline pipeline; - private Processor processor; + private CompoundProcessor processor; private IngestDocument ingestDocument; @Before @@ -58,10 +58,9 @@ public class SimulateExecutionServiceTests extends ESTestCase { .build() ); executionService = new SimulateExecutionService(threadPool); - processor = mock(Processor.class); + processor = mock(CompoundProcessor.class); when(processor.getType()).thenReturn("mock"); - pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor)); - //ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.singletonMap("foo", "bar")); + pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor)); ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestParsingTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestParsingTests.java index 4bff810a9c2..bee6ddd141c 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestParsingTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestParsingTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.processor.CompoundProcessor; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.test.ESTestCase; @@ -28,6 +29,7 @@ import org.junit.Before; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -49,7 +51,9 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { @Before public void init() throws IOException { - Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, Collections.singletonList(mock(Processor.class))); + CompoundProcessor pipelineCompoundProcessor = mock(CompoundProcessor.class); + when(pipelineCompoundProcessor.getProcessors()).thenReturn(Arrays.asList(mock(Processor.class))); + Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor); Map processorRegistry = new HashMap<>(); processorRegistry.put("mock_processor", mock(Processor.Factory.class)); store = mock(PipelineStore.class); @@ -133,9 +137,28 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { List> processors = new ArrayList<>(); int numProcessors = randomIntBetween(1, 10); for (int i = 0; i < numProcessors; i++) { - processors.add(Collections.singletonMap("mock_processor", Collections.emptyMap())); + Map processorConfig = new HashMap<>(); + List> onFailureProcessors = new ArrayList<>(); + int numOnFailureProcessors = randomIntBetween(0, 1); + for (int j = 0; j < numOnFailureProcessors; j++) { + onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap())); + } + if (numOnFailureProcessors > 0) { + processorConfig.put("on_failure", onFailureProcessors); + } + processors.add(Collections.singletonMap("mock_processor", processorConfig)); } pipelineConfig.put("processors", processors); + + List> onFailureProcessors = new ArrayList<>(); + int numOnFailureProcessors = randomIntBetween(0, 1); + for (int i = 0; i < numOnFailureProcessors; i++) { + onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap())); + } + if (numOnFailureProcessors > 0) { + pipelineConfig.put("on_failure", onFailureProcessors); + } + requestContent.put(Fields.PIPELINE, pipelineConfig); SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, store); diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml index 7dd83313a7f..01b43cfefaa 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml @@ -72,3 +72,53 @@ } ] } + +--- +"Test basic pipeline with on_failure in processor": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field2", + "value": "_value", + "on_failure": [ + { + "set" : { + "field" : "field2", + "value" : "_failed_value" + } + } + ] + } + } + ] + } + - match: { _index: ".ingest" } + - match: { _type: "pipeline" } + - match: { _version: 1 } + - match: { _id: "my_pipeline" } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline._source.description: "_description" } + - match: { my_pipeline._version: 1 } + + - do: + ingest.delete_pipeline: + id: "my_pipeline" + - match: { _index: ".ingest" } + - match: { _type: "pipeline" } + - match: { _version: 2 } + - match: { _id: "my_pipeline" } + - match: { found: true } + + - do: + catch: missing + ingest.get_pipeline: + id: "my_pipeline" diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml index 641fecf16f1..256587f6549 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml @@ -69,6 +69,48 @@ } - length: { docs: 1 } +--- +"Test simulate with provided pipeline definition with on_failure block": + - do: + ingest.simulate: + body: > + { + "pipeline": { + "description": "_description", + "processors": [ + { + "rename" : { + "field" : "does_not_exist", + "to" : "field2", + "on_failure" : [ + { + "set" : { + "field" : "field2", + "value" : "_value" + } + } + ] + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } + - length: { docs: 1 } + - match: { docs.0.doc._source.foo: "bar" } + - match: { docs.0.doc._source.field2: "_value" } + - length: { docs.0.doc._ingest: 1 } + - is_true: docs.0.doc._ingest.timestamp + --- "Test simulate with no provided pipeline or pipeline_id": - do: diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_on_failure.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_on_failure.yaml new file mode 100644 index 00000000000..eeae210c2b5 --- /dev/null +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_on_failure.yaml @@ -0,0 +1,122 @@ +--- +"Test Pipeline With On Failure Block": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "rename" : { + "field" : "foofield", + "to" : "field1" + } + }, + { + "grok" : { + "field" : "field1", + "pattern" : "%{NUMBER:val} %{NUMBER:status} <%{WORD:msg}>" + } + } + ], + "on_failure" : [ + { + "grok" : { + "field" : "field1", + "pattern" : "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>" + } + }, + { + "set" : { + "field" : "_failed", + "value" : true + } + } + ] + } + - match: { _id: "my_pipeline" } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline_id: "my_pipeline" + body: {field1: "123.42 400 "} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.val: 123.42 } + - match: { _source.status: 400 } + - match: { _source.msg: "foo" } + - match: { _source._failed: true } + +--- +"Test Pipeline With Nested Processor On Failures": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "grok" : { + "field" : "field1", + "pattern" : "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>" + } + }, + { + "rename" : { + "field" : "foofield", + "to" : "field1", + "on_failure" : [ + { + "set" : { + "field" : "foofield", + "value" : "exists" + } + }, + { + "rename" : { + "field" : "foofield2", + "to" : "field1", + "on_failure" : [ + { + "set" : { + "field" : "foofield2", + "value" : "ran" + } + } + ] + } + } + ] + } + } + ] + } + - match: { _id: "my_pipeline" } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline_id: "my_pipeline" + body: {field1: "123.42 400 "} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.val: 123.42 } + - match: { _source.msg: "foo" } + - match: { _source.status: 400 } + - match: { _source.foofield: "exists" } + - match: { _source.foofield2: "ran" }