From a56fb6192ecf8fc4b596a0b07ced913027327453 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 12 May 2020 15:41:05 -0500 Subject: [PATCH] [7.x] Fix ingest simulate verbose on failure with conditional (#56478) (#56635) If a conditional is added to a processor, and that processor fails, and that processor has an on_failure handler, the full trace of all of the executed processors may not be displayed in simulate verbose. The information is correct, but misses displaying some of the steps used to get there. This happens because a processor that is conditional processor is a wrapper around the real processor and a processor with an on_failure handler is also a wrapper around the processor(s). When decorating for simulation we treat compound processor specially, but if a compound processor is wrapped by a conditional processor that compound processor's processors can be missed for decoration resulting in the missing displayed steps. The fix to this is to treat the conditional processor specially and explicitly seperate it from the processor it is wrapping. This requires us to keep track of 2 processors a possible conditional processor and the actual processor it may be wrapping. related: #56004 --- .../rest-api-spec/test/ingest/90_simulate.yml | 70 +++++++++++++++++++ .../ingest/SimulateExecutionService.java | 2 +- .../ingest/TrackingResultProcessor.java | 65 +++++++++-------- .../ingest/TrackingResultProcessorTests.java | 65 ++++++++++++++--- 4 files changed, 161 insertions(+), 41 deletions(-) diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 8bf7c06b567..714564b0a43 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -758,3 +758,73 @@ teardown: - match: { docs.0.processor_results.2.doc._source.pipeline1: true } - match: { docs.0.processor_results.2.doc._source.pipeline2: true } +--- +"Test verbose simulate with true conditional and on failure": +- do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors": [ + { + "rename": { + "tag": "gunna_fail", + "if": "true", + "field": "foo1", + "target_field": "fieldA", + "on_failure": [ + { + "set": { + "field": "failed1", + "value": "failed1", + "tag": "failed1" + } + }, + { + "rename": { + "tag": "gunna_fail_again", + "if": "true", + "field": "foo2", + "target_field": "fieldA", + "on_failure": [ + { + "set": { + "field": "failed2", + "value": "failed2", + "tag": "failed2" + } + } + ] + } + } + ] + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } +- length: { docs: 1 } +- length: { docs.0.processor_results: 4 } +- match: { docs.0.processor_results.0.tag: "gunna_fail" } +- match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" } +- match: { docs.0.processor_results.1.tag: "failed1" } +- match: { docs.0.processor_results.1.doc._source.failed1: "failed1" } +- match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" } +- match: { docs.0.processor_results.2.tag: "gunna_fail_again" } +- match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" } +- match: { docs.0.processor_results.3.tag: "failed2" } +- match: { docs.0.processor_results.3.doc._source.failed1: "failed1" } +- match: { docs.0.processor_results.3.doc._source.failed2: "failed2" } +- match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" } + + diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 19145709680..66270a5fc7f 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -47,7 +47,7 @@ class SimulateExecutionService { BiConsumer handler) { if (verbose) { List processorResultList = new CopyOnWriteArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), verbosePipelineProcessor); ingestDocument.executePipeline(verbosePipeline, (result, e) -> { diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 4abaadb353c..8a84c40f088 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -32,17 +32,27 @@ import java.util.function.BiConsumer; public final class TrackingResultProcessor implements Processor { private final Processor actualProcessor; + private final ConditionalProcessor conditionalProcessor; private final List processorResultList; private final boolean ignoreFailure; - TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { + TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, ConditionalProcessor conditionalProcessor, + List processorResultList) { this.ignoreFailure = ignoreFailure; this.processorResultList = processorResultList; this.actualProcessor = actualProcessor; + this.conditionalProcessor = conditionalProcessor; } @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { + if (conditionalProcessor != null ) { + if (conditionalProcessor.evaluate(ingestDocument) == false) { + handler.accept(ingestDocument, null); + return; + } + } + if (actualProcessor instanceof PipelineProcessor) { PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor); Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument); @@ -64,7 +74,7 @@ public final class TrackingResultProcessor implements Processor { } } else { //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), verbosePipelineProcessor); ingestDocument.executePipeline(verbosePipeline, handler); @@ -73,36 +83,20 @@ public final class TrackingResultProcessor implements Processor { return; } - final Processor processor; - if (actualProcessor instanceof ConditionalProcessor) { - ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor; - if (conditionalProcessor.evaluate(ingestDocument) == false) { - handler.accept(ingestDocument, null); - return; - } - if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) { - processor = conditionalProcessor.getInnerProcessor(); - } else { - processor = actualProcessor; - } - } else { - processor = actualProcessor; - } - - processor.execute(ingestDocument, (result, e) -> { + actualProcessor.execute(ingestDocument, (result, e) -> { if (e != null) { if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e)); } else { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e)); } handler.accept(null, e); } else { if (result != null) { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); handler.accept(result, null); } else { - processorResultList.add(new SimulateProcessorResult(processor.getTag())); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag())); handler.accept(null, null); } } @@ -124,21 +118,34 @@ public final class TrackingResultProcessor implements Processor { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { - List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, ConditionalProcessor parentCondition, + List processorResultList) { + List processors = new ArrayList<>(); for (Processor processor : compoundProcessor.getProcessors()) { + ConditionalProcessor conditionalProcessor = parentCondition; + if (processor instanceof ConditionalProcessor) { + conditionalProcessor = (ConditionalProcessor) processor; + processor = conditionalProcessor.getInnerProcessor(); + } if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList)); + processors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList)); } else { - processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); + processors.add( + new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { + ConditionalProcessor conditionalProcessor = null; + if (processor instanceof ConditionalProcessor) { + conditionalProcessor = (ConditionalProcessor) processor; + processor = conditionalProcessor.getInnerProcessor(); + } if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); + onFailureProcessors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList)); } else { - onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); + onFailureProcessors.add( + new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList)); } } return new CompoundProcessor(compoundProcessor.isIgnoreFailure(), processors, onFailureProcessors); diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index ef4613ce2ff..5aec8a5fcde 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -64,7 +64,7 @@ public class TrackingResultProcessorTests extends ESTestCase { public void testActualProcessor() throws Exception { TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); - TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); + TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -81,7 +81,7 @@ public class TrackingResultProcessorTests extends ESTestCase { RuntimeException exception = new RuntimeException("processor failed"); TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); Exception[] holder = new Exception[1]; trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); @@ -104,7 +104,7 @@ public class TrackingResultProcessorTests extends ESTestCase { Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); @@ -137,12 +137,53 @@ public class TrackingResultProcessorTests extends ESTestCase { assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); } + public void testActualCompoundProcessorWithOnFailureAndTrueCondition() throws Exception { + String scriptName = "conditionalScript"; + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> true), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + RuntimeException exception = new RuntimeException("fail"); + TestProcessor failProcessor = new TestProcessor("fail", "test", exception); + ConditionalProcessor conditionalProcessor = new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + failProcessor); + TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); + CompoundProcessor actualProcessor = + new CompoundProcessor(false, + Arrays.asList(conditionalProcessor), + Arrays.asList(onFailureProcessor)); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); + trackingProcessor.execute(ingestDocument, (result, e) -> { + }); + + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); + + assertThat(failProcessor.getInvokedCounter(), equalTo(1)); + assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(2)); + + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -173,7 +214,7 @@ public class TrackingResultProcessorTests extends ESTestCase { new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })), new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); - CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); @@ -215,7 +256,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -282,7 +323,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -351,7 +392,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -404,7 +445,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -455,7 +496,7 @@ public class TrackingResultProcessorTests extends ESTestCase { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); Exception[] holder = new Exception[1]; trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); @@ -481,7 +522,7 @@ public class TrackingResultProcessorTests extends ESTestCase { CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); @@ -504,4 +545,6 @@ public class TrackingResultProcessorTests extends ESTestCase { resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); } + + }