diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 30efbe1b0fa..bc40a8368f0 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -23,13 +23,14 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.Pipeline; -import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.CompoundProcessor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.ingest.processor.TrackingResultProcessor.decorate; + class SimulateExecutionService { private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT; @@ -40,40 +41,16 @@ class SimulateExecutionService { this.threadPool = threadPool; } - void executeVerboseDocument(Processor processor, IngestDocument ingestDocument, List processorResultList) throws Exception { - if (processor instanceof CompoundProcessor) { - CompoundProcessor cp = (CompoundProcessor) processor; - try { - for (Processor p : cp.getProcessors()) { - executeVerboseDocument(p, ingestDocument, processorResultList); - } - } catch (Exception e) { - for (Processor p : cp.getOnFailureProcessors()) { - executeVerboseDocument(p, ingestDocument, processorResultList); - } - } - } else { - try { - processor.execute(ingestDocument); - processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); - } catch (Exception e) { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); - throw e; - } - } - } - SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { if (verbose) { List processorResultList = new ArrayList<>(); - IngestDocument currentIngestDocument = new IngestDocument(ingestDocument); - CompoundProcessor pipelineProcessor = new CompoundProcessor(pipeline.getProcessors(), pipeline.getOnFailureProcessors()); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); try { - executeVerboseDocument(pipelineProcessor, currentIngestDocument, processorResultList); + verbosePipelineProcessor.execute(ingestDocument); + return new SimulateDocumentVerboseResult(processorResultList); } catch (Exception e) { - return new SimulateDocumentBaseResult(e); + return new SimulateDocumentVerboseResult(processorResultList); } - return new SimulateDocumentVerboseResult(processorResultList); } else { try { pipeline.execute(ingestDocument); diff --git a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java index ddf3781d1a6..16b3aa10a22 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java @@ -28,15 +28,16 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * A Processor that executes a list of other "processors". It executes a separate list of * "onFailureProcessors" when any of the processors throw an {@link Exception}. */ public class CompoundProcessor implements Processor { - static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message"; - static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type"; - static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag"; + public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message"; + public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type"; + public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag"; private final List processors; private final List onFailureProcessors; @@ -84,7 +85,7 @@ public class CompoundProcessor implements Processor { @Override public String getTag() { - return "compound-processor-" + Objects.hash(processors, onFailureProcessors); + return "CompoundProcessor-" + flattenProcessors().stream().map(Processor::getTag).collect(Collectors.joining("-")); } @Override @@ -104,18 +105,27 @@ public class CompoundProcessor implements Processor { } void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) throws Exception { - Map ingestMetadata = ingestDocument.getIngestMetadata(); try { - ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage()); - ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType); - ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag); + putFailureMetadata(ingestDocument, cause, failedProcessorType, failedProcessorTag); for (Processor processor : onFailureProcessors) { processor.execute(ingestDocument); } } finally { - ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD); - ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD); - ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD); + removeFailureMetadata(ingestDocument); } } + + private void putFailureMetadata(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) { + Map ingestMetadata = ingestDocument.getIngestMetadata(); + ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage()); + ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType); + ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag); + } + + private void removeFailureMetadata(IngestDocument ingestDocument) { + Map ingestMetadata = ingestDocument.getIngestMetadata(); + ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD); + ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD); + ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD); + } } diff --git a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java index 821a44c0a96..aaae929e0a9 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java @@ -68,6 +68,13 @@ public final class Pipeline { return description; } + /** + * Get the underlying {@link CompoundProcessor} containing the Pipeline's processors + */ + public CompoundProcessor getCompoundProcessor() { + return compoundProcessor; + } + /** * Unmodifiable list containing each processor that operates on the data. */ diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java new file mode 100644 index 00000000000..af820318d83 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.ingest.core.CompoundProcessor; +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.core.Processor; + +import java.util.ArrayList; +import java.util.List; + +/** + * Processor to be used within Simulate API to keep track of processors executed in pipeline. + */ +public final class TrackingResultProcessor implements Processor { + + private final Processor actualProcessor; + private final List processorResultList; + + public TrackingResultProcessor(Processor actualProcessor, List processorResultList) { + this.processorResultList = processorResultList; + if (actualProcessor instanceof CompoundProcessor) { + CompoundProcessor trackedCompoundProcessor = decorate((CompoundProcessor) actualProcessor, processorResultList); + this.actualProcessor = trackedCompoundProcessor; + } else { + this.actualProcessor = actualProcessor; + } + } + + @Override + public void execute(IngestDocument ingestDocument) throws Exception { + try { + actualProcessor.execute(ingestDocument); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); + } catch (Exception e) { + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e)); + throw e; + } + } + + @Override + public String getType() { + return actualProcessor.getType(); + } + + @Override + public String getTag() { + return actualProcessor.getTag(); + } + + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { + List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); + for (Processor processor : compoundProcessor.getProcessors()) { + if (processor instanceof CompoundProcessor) { + processors.add(decorate((CompoundProcessor) processor, processorResultList)); + } else { + processors.add(new TrackingResultProcessor(processor, processorResultList)); + } + } + List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); + for (Processor processor : compoundProcessor.getOnFailureProcessors()) { + if (processor instanceof CompoundProcessor) { + onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); + } else { + onFailureProcessors.add(new TrackingResultProcessor(processor, processorResultList)); + } + } + return new CompoundProcessor(processors, onFailureProcessors); + } +} + diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index cf1cab2416b..f66dfa81eac 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -31,10 +31,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.List; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -46,7 +44,6 @@ public class SimulateExecutionServiceTests extends ESTestCase { private ThreadPool threadPool; private SimulateExecutionService executionService; - private Pipeline pipeline; private Processor processor; private IngestDocument ingestDocument; @@ -59,7 +56,6 @@ public class SimulateExecutionServiceTests extends ESTestCase { ); executionService = new SimulateExecutionService(threadPool); processor = new TestProcessor("id", "mock", ingestDocument -> {}); - pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor)); ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); } @@ -68,74 +64,6 @@ public class SimulateExecutionServiceTests extends ESTestCase { threadPool.shutdown(); } - public void testExecuteVerboseDocumentSimple() throws Exception { - List processorResultList = new ArrayList<>(); - executionService.executeVerboseDocument(processor, ingestDocument, processorResultList); - SimulateProcessorResult result = new SimulateProcessorResult("id", ingestDocument); - assertThat(processorResultList.size(), equalTo(1)); - assertThat(processorResultList.get(0).getProcessorTag(), equalTo(result.getProcessorTag())); - assertThat(processorResultList.get(0).getIngestDocument(), equalTo(result.getIngestDocument())); - assertThat(processorResultList.get(0).getFailure(), nullValue()); - } - - public void testExecuteVerboseDocumentSimpleException() throws Exception { - RuntimeException exception = new RuntimeException("mock_exception"); - TestProcessor processor = new TestProcessor("id", "mock", ingestDocument -> { throw exception; }); - List processorResultList = new ArrayList<>(); - try { - executionService.executeVerboseDocument(processor, ingestDocument, processorResultList); - fail("should throw exception"); - } catch (RuntimeException e) { - assertThat(e.getMessage(), equalTo("mock_exception")); - } - SimulateProcessorResult result = new SimulateProcessorResult("id", exception); - assertThat(processorResultList.size(), equalTo(1)); - assertThat(processorResultList.get(0).getProcessorTag(), equalTo(result.getProcessorTag())); - assertThat(processorResultList.get(0).getFailure(), equalTo(result.getFailure())); - } - - public void testExecuteVerboseDocumentCompoundSuccess() throws Exception { - TestProcessor processor1 = new TestProcessor("p1", "mock", ingestDocument -> { }); - TestProcessor processor2 = new TestProcessor("p2", "mock", ingestDocument -> { }); - - Processor compoundProcessor = new CompoundProcessor(processor1, processor2); - List processorResultList = new ArrayList<>(); - executionService.executeVerboseDocument(compoundProcessor, ingestDocument, processorResultList); - assertThat(processor1.getInvokedCounter(), equalTo(1)); - assertThat(processor2.getInvokedCounter(), equalTo(1)); - assertThat(processorResultList.size(), equalTo(2)); - assertThat(processorResultList.get(0).getProcessorTag(), equalTo("p1")); - assertThat(processorResultList.get(0).getIngestDocument(), equalTo(ingestDocument)); - assertThat(processorResultList.get(0).getFailure(), nullValue()); - assertThat(processorResultList.get(1).getProcessorTag(), equalTo("p2")); - assertThat(processorResultList.get(1).getIngestDocument(), equalTo(ingestDocument)); - assertThat(processorResultList.get(1).getFailure(), nullValue()); - } - - public void testExecuteVerboseDocumentCompoundOnFailure() throws Exception { - TestProcessor processor1 = new TestProcessor("p1", "mock", ingestDocument -> { }); - TestProcessor processor2 = new TestProcessor("p2", "mock", ingestDocument -> { throw new RuntimeException("p2_exception"); }); - TestProcessor onFailureProcessor1 = new TestProcessor("fail_p1", "mock", ingestDocument -> { }); - TestProcessor onFailureProcessor2 = new TestProcessor("fail_p2", "mock", ingestDocument -> { throw new RuntimeException("fail_p2_exception"); }); - TestProcessor onFailureProcessor3 = new TestProcessor("fail_p3", "mock", ingestDocument -> { }); - CompoundProcessor onFailureCompoundProcessor = new CompoundProcessor(Collections.singletonList(onFailureProcessor2), Collections.singletonList(onFailureProcessor3)); - - Processor compoundProcessor = new CompoundProcessor(Arrays.asList(processor1, processor2), Arrays.asList(onFailureProcessor1, onFailureCompoundProcessor)); - List processorResultList = new ArrayList<>(); - executionService.executeVerboseDocument(compoundProcessor, ingestDocument, processorResultList); - assertThat(processor1.getInvokedCounter(), equalTo(1)); - assertThat(processor2.getInvokedCounter(), equalTo(1)); - assertThat(onFailureProcessor1.getInvokedCounter(), equalTo(1)); - assertThat(onFailureProcessor2.getInvokedCounter(), equalTo(1)); - assertThat(onFailureProcessor3.getInvokedCounter(), equalTo(1)); - assertThat(processorResultList.size(), equalTo(5)); - assertThat(processorResultList.get(0).getProcessorTag(), equalTo("p1")); - assertThat(processorResultList.get(1).getProcessorTag(), equalTo("p2")); - assertThat(processorResultList.get(2).getProcessorTag(), equalTo("fail_p1")); - assertThat(processorResultList.get(3).getProcessorTag(), equalTo("fail_p2")); - assertThat(processorResultList.get(4).getProcessorTag(), equalTo("fail_p3")); - } - public void testExecuteVerboseItem() throws Exception { TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor)); @@ -170,16 +98,43 @@ public class SimulateExecutionServiceTests extends ESTestCase { assertThat(simulateDocumentBaseResult.getFailure(), nullValue()); } - public void testExecuteVerboseItemWithFailure() throws Exception { + public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception { + TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {}); + TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); }); + TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); + Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2, processor3)); + SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertThat(processor2.getInvokedCounter(), equalTo(1)); + assertThat(processor3.getInvokedCounter(), equalTo(0)); + assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); + SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; + assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0")); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue()); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument))); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), equalTo(ingestDocument)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), not(sameInstance(ingestDocument.getSourceAndMetadata()))); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1")); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue()); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class)); + RuntimeException runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(); + assertThat(runtimeException.getMessage(), equalTo("processor failed")); + } + + public void testExecuteVerboseItemWithOnFailure() throws Exception { TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); }); TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {}); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2))); + TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); + Pipeline pipeline = new Pipeline("_id", "_description", + new CompoundProcessor(new CompoundProcessor(Collections.singletonList(processor1), + Collections.singletonList(processor2)), processor3)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; - assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(3)); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0")); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), nullValue()); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), instanceOf(RuntimeException.class)); @@ -187,8 +142,20 @@ public class SimulateExecutionServiceTests extends ESTestCase { assertThat(runtimeException.getMessage(), equalTo("processor failed")); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1")); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument))); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocument)); + + IngestDocument ingestDocumentWithOnFailureMetadata = new IngestDocument(ingestDocument); + Map metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata(); + metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock"); + metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0"); + metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed"); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocumentWithOnFailureMetadata)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue()); + + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2")); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument))); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), equalTo(ingestDocument)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue()); } public void testExecuteItemWithFailure() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java new file mode 100644 index 00000000000..e53eec56cf1 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.ingest.core.CompoundProcessor; +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.ingest.core.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; +import static org.elasticsearch.ingest.core.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; +import static org.elasticsearch.ingest.core.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; +import static org.elasticsearch.ingest.processor.TrackingResultProcessor.decorate; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class TrackingResultProcessorTests extends ESTestCase { + + private IngestDocument ingestDocument; + private List resultList; + + @Before + public void init() { + ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); + resultList = new ArrayList<>(); + } + + public void testActualProcessor() throws Exception { + TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); + TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(actualProcessor, resultList); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + + assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithoutOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + + try { + trackingProcessor.execute(ingestDocument); + } catch (Exception e) { + assertThat(e.getMessage(), equalTo(exception.getMessage())); + } + + SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("fail"); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); + TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); + CompoundProcessor actualProcessor = new CompoundProcessor( + Arrays.asList(new CompoundProcessor( + Arrays.asList(failProcessor, onFailureProcessor), + Arrays.asList(onFailureProcessor, failProcessor))), + Arrays.asList(onFailureProcessor)); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); + + assertThat(failProcessor.getInvokedCounter(), equalTo(2)); + assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); + assertThat(resultList.size(), equalTo(4)); + + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + + assertThat(resultList.get(2).getIngestDocument(), nullValue()); + assertThat(resultList.get(2).getFailure(), equalTo(exception)); + assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("compound")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("CompoundProcessor-fail-success-success-fail")); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + } +} diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/10_pipeline_with_mustache_templates.yaml b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/10_pipeline_with_mustache_templates.yaml index 0afa99cc6cc..27114ac61f2 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/10_pipeline_with_mustache_templates.yaml +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/10_pipeline_with_mustache_templates.yaml @@ -274,3 +274,55 @@ id: 1 - length: { _source: 2 } - match: { _source.values_flat: ["foo_bar", "foo_baz"] } + +--- +"Test verbose simulate with error context": + - do: + cluster.health: + wait_for_status: green + - do: + ingest.simulate: + verbose: true + body: > + { + "pipeline" : { + "description": "_description", + "processors": [ + { + "rename" : { + "tag" : "rename-status", + "field" : "status", + "to" : "bar", + "on_failure" : [ + { + "set" : { + "tag" : "set_on_rename_failure", + "field" : "error", + "value" : "processor {{ _ingest.on_failure_processor_tag }} [{{ _ingest.on_failure_processor_type }}]: {{ _ingest.on_failure_message }}" + } + } + ] + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } + - length: { docs: 1 } + - length: { docs.0.processor_results: 2 } + - match: { docs.0.processor_results.0.tag: "rename-status" } + - match: { docs.0.processor_results.0.error.type: "illegal_argument_exception" } + - match: { docs.0.processor_results.0.error.reason: "field [status] doesn't exist" } + - match: { docs.0.processor_results.1.tag: "set_on_rename_failure" } + - length: { docs.0.processor_results.1.doc._source: 2 } + - match: { docs.0.processor_results.1.doc._source.foo: "bar" } + - match: { docs.0.processor_results.1.doc._source.error: "processor rename-status [rename]: field [status] doesn't exist" }