Add processor tags to on_failure metadata in ingest pipeline

closes #16202
This commit is contained in:
Tal Levy 2016-01-29 14:36:11 -08:00
parent 6e91d65a99
commit 61e4283a16
4 changed files with 22 additions and 15 deletions

View File

@ -32,7 +32,8 @@ import java.util.Objects;
*/ */
public class CompoundProcessor implements Processor { public class CompoundProcessor implements Processor {
static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message"; static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
static final String ON_FAILURE_PROCESSOR_FIELD = "on_failure_processor"; static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
private final List<Processor> processors; private final List<Processor> processors;
private final List<Processor> onFailureProcessors; private final List<Processor> onFailureProcessors;
@ -74,24 +75,26 @@ public class CompoundProcessor implements Processor {
if (onFailureProcessors.isEmpty()) { if (onFailureProcessors.isEmpty()) {
throw e; throw e;
} else { } else {
executeOnFailure(ingestDocument, e, processor.getType()); executeOnFailure(ingestDocument, e, processor.getType(), processor.getTag());
} }
break; break;
} }
} }
} }
void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType) throws Exception { void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) throws Exception {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
try { try {
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage()); ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_FIELD, failedProcessorType); ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
for (Processor processor : onFailureProcessors) { for (Processor processor : onFailureProcessors) {
processor.execute(ingestDocument); processor.execute(ingestDocument);
} }
} finally { } finally {
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD); ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_FIELD); ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
} }
} }
} }

View File

@ -80,9 +80,10 @@ public class CompoundProcessorTests extends ESTestCase {
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> { TestProcessor processor2 = new TestProcessor(ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2)); assertThat(ingestMetadata.size(), equalTo(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id"));
}); });
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2)); CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2));
@ -94,18 +95,20 @@ public class CompoundProcessorTests extends ESTestCase {
public void testSingleProcessorWithNestedFailures() throws Exception { public void testSingleProcessorWithNestedFailures() throws Exception {
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processorToFail = new TestProcessor("id", "second", ingestDocument -> { TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2)); assertThat(ingestMetadata.size(), equalTo(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id"));
throw new RuntimeException("error"); throw new RuntimeException("error");
}); });
TestProcessor lastProcessor = new TestProcessor(ingestDocument -> { TestProcessor lastProcessor = new TestProcessor(ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2)); assertThat(ingestMetadata.size(), equalTo(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2"));
}); });
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(Collections.singletonList(processorToFail), Collections.singletonList(lastProcessor)); CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(Collections.singletonList(processorToFail), Collections.singletonList(lastProcessor));
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor)); CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor));

View File

@ -725,7 +725,7 @@ the index for which failed documents get sent.
Sometimes you may want to retrieve the actual error message that was thrown Sometimes you may want to retrieve the actual error message that was thrown
by a failed processor. To do so you can access metadata fields called by a failed processor. To do so you can access metadata fields called
`on_failure_message` and `on_failure_processor`. These fields are only accessible `on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag`. These fields are only accessible
from within the context of an `on_failure` block. Here is an updated version of from within the context of an `on_failure` block. Here is an updated version of
our first example which leverages these fields to provide the error message instead our first example which leverages these fields to provide the error message instead
of manually setting it. of manually setting it.

View File

@ -185,12 +185,13 @@
"processors": [ "processors": [
{ {
"remove" : { "remove" : {
"tag" : "first_processor",
"field" : "field_to_remove", "field" : "field_to_remove",
"on_failure" : [ "on_failure" : [
{ {
"set" : { "set" : {
"field" : "error", "field" : "error",
"value" : "processor [{{ _ingest.on_failure_processor }}]: {{ _ingest.on_failure_message }}" "value" : "processor {{ _ingest.on_failure_processor_tag }} [{{ _ingest.on_failure_processor_type }}]: {{ _ingest.on_failure_message }}"
} }
} }
] ]
@ -217,4 +218,4 @@
id: 1 id: 1
- length: { _source: 2 } - length: { _source: 2 }
- match: { _source.do_nothing: "foo" } - match: { _source.do_nothing: "foo" }
- match: { _source.error: "processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" } - match: { _source.error: "processor first_processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" }