diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 8bf7c06b567..714564b0a43 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -758,3 +758,73 @@ teardown: - match: { docs.0.processor_results.2.doc._source.pipeline1: true } - match: { docs.0.processor_results.2.doc._source.pipeline2: true } +--- +"Test verbose simulate with true conditional and on failure": +- do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors": [ + { + "rename": { + "tag": "gunna_fail", + "if": "true", + "field": "foo1", + "target_field": "fieldA", + "on_failure": [ + { + "set": { + "field": "failed1", + "value": "failed1", + "tag": "failed1" + } + }, + { + "rename": { + "tag": "gunna_fail_again", + "if": "true", + "field": "foo2", + "target_field": "fieldA", + "on_failure": [ + { + "set": { + "field": "failed2", + "value": "failed2", + "tag": "failed2" + } + } + ] + } + } + ] + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } +- length: { docs: 1 } +- length: { docs.0.processor_results: 4 } +- match: { docs.0.processor_results.0.tag: "gunna_fail" } +- match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" } +- match: { docs.0.processor_results.1.tag: "failed1" } +- match: { docs.0.processor_results.1.doc._source.failed1: "failed1" } +- match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" } +- match: { docs.0.processor_results.2.tag: "gunna_fail_again" } +- match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" } +- match: { docs.0.processor_results.3.tag: "failed2" } +- match: { docs.0.processor_results.3.doc._source.failed1: "failed1" } +- match: { docs.0.processor_results.3.doc._source.failed2: "failed2" } +- match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" } + + diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 19145709680..66270a5fc7f 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -47,7 +47,7 @@ class SimulateExecutionService { BiConsumer handler) { if (verbose) { List processorResultList = new CopyOnWriteArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), verbosePipelineProcessor); ingestDocument.executePipeline(verbosePipeline, (result, e) -> { diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 4abaadb353c..8a84c40f088 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -32,17 +32,27 @@ import java.util.function.BiConsumer; public final class TrackingResultProcessor implements Processor { private final Processor actualProcessor; + private final ConditionalProcessor conditionalProcessor; private final List processorResultList; private final boolean ignoreFailure; - TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { + TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, ConditionalProcessor conditionalProcessor, + List processorResultList) { this.ignoreFailure = ignoreFailure; this.processorResultList = processorResultList; this.actualProcessor = actualProcessor; + this.conditionalProcessor = conditionalProcessor; } @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { + if (conditionalProcessor != null ) { + if (conditionalProcessor.evaluate(ingestDocument) == false) { + handler.accept(ingestDocument, null); + return; + } + } + if (actualProcessor instanceof PipelineProcessor) { PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor); Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument); @@ -64,7 +74,7 @@ public final class TrackingResultProcessor implements Processor { } } else { //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), verbosePipelineProcessor); ingestDocument.executePipeline(verbosePipeline, handler); @@ -73,36 +83,20 @@ public final class TrackingResultProcessor implements Processor { return; } - final Processor processor; - if (actualProcessor instanceof ConditionalProcessor) { - ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor; - if (conditionalProcessor.evaluate(ingestDocument) == false) { - handler.accept(ingestDocument, null); - return; - } - if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) { - processor = conditionalProcessor.getInnerProcessor(); - } else { - processor = actualProcessor; - } - } else { - processor = actualProcessor; - } - - processor.execute(ingestDocument, (result, e) -> { + actualProcessor.execute(ingestDocument, (result, e) -> { if (e != null) { if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e)); } else { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e)); } handler.accept(null, e); } else { if (result != null) { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); handler.accept(result, null); } else { - processorResultList.add(new SimulateProcessorResult(processor.getTag())); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag())); handler.accept(null, null); } } @@ -124,21 +118,34 @@ public final class TrackingResultProcessor implements Processor { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { - List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, ConditionalProcessor parentCondition, + List processorResultList) { + List processors = new ArrayList<>(); for (Processor processor : compoundProcessor.getProcessors()) { + ConditionalProcessor conditionalProcessor = parentCondition; + if (processor instanceof ConditionalProcessor) { + conditionalProcessor = (ConditionalProcessor) processor; + processor = conditionalProcessor.getInnerProcessor(); + } if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList)); + processors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList)); } else { - processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); + processors.add( + new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { + ConditionalProcessor conditionalProcessor = null; + if (processor instanceof ConditionalProcessor) { + conditionalProcessor = (ConditionalProcessor) processor; + processor = conditionalProcessor.getInnerProcessor(); + } if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); + onFailureProcessors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList)); } else { - onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); + onFailureProcessors.add( + new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList)); } } return new CompoundProcessor(compoundProcessor.isIgnoreFailure(), processors, onFailureProcessors); diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index ef4613ce2ff..5aec8a5fcde 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -64,7 +64,7 @@ public class TrackingResultProcessorTests extends ESTestCase { public void testActualProcessor() throws Exception { TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); - TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); + TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -81,7 +81,7 @@ public class TrackingResultProcessorTests extends ESTestCase { RuntimeException exception = new RuntimeException("processor failed"); TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); Exception[] holder = new Exception[1]; trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); @@ -104,7 +104,7 @@ public class TrackingResultProcessorTests extends ESTestCase { Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); @@ -137,12 +137,53 @@ public class TrackingResultProcessorTests extends ESTestCase { assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); } + public void testActualCompoundProcessorWithOnFailureAndTrueCondition() throws Exception { + String scriptName = "conditionalScript"; + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> true), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + RuntimeException exception = new RuntimeException("fail"); + TestProcessor failProcessor = new TestProcessor("fail", "test", exception); + ConditionalProcessor conditionalProcessor = new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + failProcessor); + TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); + CompoundProcessor actualProcessor = + new CompoundProcessor(false, + Arrays.asList(conditionalProcessor), + Arrays.asList(onFailureProcessor)); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); + trackingProcessor.execute(ingestDocument, (result, e) -> { + }); + + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); + + assertThat(failProcessor.getInvokedCounter(), equalTo(1)); + assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(2)); + + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -173,7 +214,7 @@ public class TrackingResultProcessorTests extends ESTestCase { new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })), new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); - CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); @@ -215,7 +256,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -282,7 +323,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -351,7 +392,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -404,7 +445,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -455,7 +496,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); Exception[] holder = new Exception[1]; trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); @@ -481,7 +522,7 @@ public class TrackingResultProcessorTests extends ESTestCase { CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -504,4 +545,6 @@ public class TrackingResultProcessorTests extends ESTestCase { resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); } + + }