[7.x] Fix ingest simulate verbose on failure with conditional (#56478) (#56635)

If a conditional is added to a processor, and that processor fails, and 
that processor has an on_failure handler, the full trace of all of the 
executed processors may not be displayed in simulate verbose. The 
information is correct, but misses displaying some of the steps used 
to get there.

This happens because a processor that is conditional processor is a 
wrapper around the real processor and a processor with an on_failure 
handler is also a wrapper around the processor(s). When decorating for 
simulation we treat compound processor specially, but if a compound processor
is wrapped by a conditional processor that compound processor's processors 
can be missed for decoration resulting in the missing displayed steps.

The fix to this is to treat the conditional processor specially and
explicitly seperate it from the processor it is wrapping. This requires
us to keep track of 2 processors a possible conditional processor and
the actual processor it may be wrapping.

related: #56004
This commit is contained in:
Jake Landis 2020-05-12 15:41:05 -05:00 committed by GitHub
parent cf76a932fb
commit a56fb6192e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 161 additions and 41 deletions

View File

@ -758,3 +758,73 @@ teardown:
- match: { docs.0.processor_results.2.doc._source.pipeline1: true } - match: { docs.0.processor_results.2.doc._source.pipeline1: true }
- match: { docs.0.processor_results.2.doc._source.pipeline2: true } - match: { docs.0.processor_results.2.doc._source.pipeline2: true }
---
"Test verbose simulate with true conditional and on failure":
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"processors": [
{
"rename": {
"tag": "gunna_fail",
"if": "true",
"field": "foo1",
"target_field": "fieldA",
"on_failure": [
{
"set": {
"field": "failed1",
"value": "failed1",
"tag": "failed1"
}
},
{
"rename": {
"tag": "gunna_fail_again",
"if": "true",
"field": "foo2",
"target_field": "fieldA",
"on_failure": [
{
"set": {
"field": "failed2",
"value": "failed2",
"tag": "failed2"
}
}
]
}
}
]
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 4 }
- match: { docs.0.processor_results.0.tag: "gunna_fail" }
- match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" }
- match: { docs.0.processor_results.1.tag: "failed1" }
- match: { docs.0.processor_results.1.doc._source.failed1: "failed1" }
- match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" }
- match: { docs.0.processor_results.2.tag: "gunna_fail_again" }
- match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" }
- match: { docs.0.processor_results.3.tag: "failed2" }
- match: { docs.0.processor_results.3.doc._source.failed1: "failed1" }
- match: { docs.0.processor_results.3.doc._source.failed2: "failed2" }
- match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" }

View File

@ -47,7 +47,7 @@ class SimulateExecutionService {
BiConsumer<SimulateDocumentResult, Exception> handler) { BiConsumer<SimulateDocumentResult, Exception> handler) {
if (verbose) { if (verbose) {
List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>(); List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor); verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, (result, e) -> { ingestDocument.executePipeline(verbosePipeline, (result, e) -> {

View File

@ -32,17 +32,27 @@ import java.util.function.BiConsumer;
public final class TrackingResultProcessor implements Processor { public final class TrackingResultProcessor implements Processor {
private final Processor actualProcessor; private final Processor actualProcessor;
private final ConditionalProcessor conditionalProcessor;
private final List<SimulateProcessorResult> processorResultList; private final List<SimulateProcessorResult> processorResultList;
private final boolean ignoreFailure; private final boolean ignoreFailure;
TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) { TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, ConditionalProcessor conditionalProcessor,
List<SimulateProcessorResult> processorResultList) {
this.ignoreFailure = ignoreFailure; this.ignoreFailure = ignoreFailure;
this.processorResultList = processorResultList; this.processorResultList = processorResultList;
this.actualProcessor = actualProcessor; this.actualProcessor = actualProcessor;
this.conditionalProcessor = conditionalProcessor;
} }
@Override @Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) { public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (conditionalProcessor != null ) {
if (conditionalProcessor.evaluate(ingestDocument) == false) {
handler.accept(ingestDocument, null);
return;
}
}
if (actualProcessor instanceof PipelineProcessor) { if (actualProcessor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor); PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument); Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
@ -64,7 +74,7 @@ public final class TrackingResultProcessor implements Processor {
} }
} else { } else {
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor); verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, handler); ingestDocument.executePipeline(verbosePipeline, handler);
@ -73,36 +83,20 @@ public final class TrackingResultProcessor implements Processor {
return; return;
} }
final Processor processor; actualProcessor.execute(ingestDocument, (result, e) -> {
if (actualProcessor instanceof ConditionalProcessor) {
ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor;
if (conditionalProcessor.evaluate(ingestDocument) == false) {
handler.accept(ingestDocument, null);
return;
}
if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getInnerProcessor();
} else {
processor = actualProcessor;
}
} else {
processor = actualProcessor;
}
processor.execute(ingestDocument, (result, e) -> {
if (e != null) { if (e != null) {
if (ignoreFailure) { if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e));
} else { } else {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
} }
handler.accept(null, e); handler.accept(null, e);
} else { } else {
if (result != null) { if (result != null) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
handler.accept(result, null); handler.accept(result, null);
} else { } else {
processorResultList.add(new SimulateProcessorResult(processor.getTag())); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag()));
handler.accept(null, null); handler.accept(null, null);
} }
} }
@ -124,21 +118,34 @@ public final class TrackingResultProcessor implements Processor {
return actualProcessor.getTag(); return actualProcessor.getTag();
} }
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) { public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, ConditionalProcessor parentCondition,
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size()); List<SimulateProcessorResult> processorResultList) {
List<Processor> processors = new ArrayList<>();
for (Processor processor : compoundProcessor.getProcessors()) { for (Processor processor : compoundProcessor.getProcessors()) {
ConditionalProcessor conditionalProcessor = parentCondition;
if (processor instanceof ConditionalProcessor) {
conditionalProcessor = (ConditionalProcessor) processor;
processor = conditionalProcessor.getInnerProcessor();
}
if (processor instanceof CompoundProcessor) { if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList)); processors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList));
} else { } else {
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); processors.add(
new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList));
} }
} }
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getOnFailureProcessors()) { for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
ConditionalProcessor conditionalProcessor = null;
if (processor instanceof ConditionalProcessor) {
conditionalProcessor = (ConditionalProcessor) processor;
processor = conditionalProcessor.getInnerProcessor();
}
if (processor instanceof CompoundProcessor) { if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); onFailureProcessors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList));
} else { } else {
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); onFailureProcessors.add(
new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList));
} }
} }
return new CompoundProcessor(compoundProcessor.isIgnoreFailure(), processors, onFailureProcessors); return new CompoundProcessor(compoundProcessor.isIgnoreFailure(), processors, onFailureProcessors);

View File

@ -64,7 +64,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualProcessor() throws Exception { public void testActualProcessor() throws Exception {
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -81,7 +81,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
RuntimeException exception = new RuntimeException("processor failed"); RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
Exception[] holder = new Exception[1]; Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
@ -104,7 +104,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(failProcessor, onFailureProcessor),
Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor, failProcessor))),
Arrays.asList(onFailureProcessor)); Arrays.asList(onFailureProcessor));
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
@ -137,12 +137,53 @@ public class TrackingResultProcessorTests extends ESTestCase {
assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
} }
public void testActualCompoundProcessorWithOnFailureAndTrueCondition() throws Exception {
String scriptName = "conditionalScript";
ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG,
new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> true), Collections.emptyMap())),
new HashMap<>(ScriptModule.CORE_CONTEXTS)
);
RuntimeException exception = new RuntimeException("fail");
TestProcessor failProcessor = new TestProcessor("fail", "test", exception);
ConditionalProcessor conditionalProcessor = new ConditionalProcessor(
randomAlphaOfLength(10),
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService,
failProcessor);
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
CompoundProcessor actualProcessor =
new CompoundProcessor(false,
Arrays.asList(conditionalProcessor),
Arrays.asList(onFailureProcessor));
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {
});
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument);
assertThat(failProcessor.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(2));
assertThat(resultList.get(0).getIngestDocument(), nullValue());
assertThat(resultList.get(0).getFailure(), equalTo(exception));
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
assertThat(resultList.get(1).getFailure(), nullValue());
assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
}
public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { public void testActualCompoundProcessorWithIgnoreFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed"); RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor),
Collections.emptyList()); Collections.emptyList());
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
@ -173,7 +214,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })), new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })),
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); }));
CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument);
@ -215,7 +256,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
@ -282,7 +323,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
@ -351,7 +392,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
@ -404,7 +445,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
@ -455,7 +496,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
Exception[] holder = new Exception[1]; Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
@ -481,7 +522,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
@ -504,4 +545,6 @@ public class TrackingResultProcessorTests extends ESTestCase {
resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1));
} }
} }