From 61e4283a16cc12b9daf13e5ba76a34159b3355ba Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Fri, 29 Jan 2016 14:36:11 -0800 Subject: [PATCH] Add processor tags to on_failure metadata in ingest pipeline closes #16202 --- .../ingest/core/CompoundProcessor.java | 13 ++++++++----- .../ingest/core/CompoundProcessorTests.java | 17 ++++++++++------- docs/reference/ingest/ingest.asciidoc | 2 +- .../10_pipeline_with_mustache_templates.yaml | 5 +++-- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java index 699720e18ca..c784ea1c57a 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java @@ -32,7 +32,8 @@ import java.util.Objects; */ public class CompoundProcessor implements Processor { static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message"; - static final String ON_FAILURE_PROCESSOR_FIELD = "on_failure_processor"; + static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type"; + static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag"; private final List processors; private final List onFailureProcessors; @@ -74,24 +75,26 @@ public class CompoundProcessor implements Processor { if (onFailureProcessors.isEmpty()) { throw e; } else { - executeOnFailure(ingestDocument, e, processor.getType()); + executeOnFailure(ingestDocument, e, processor.getType(), processor.getTag()); } break; } } } - void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType) throws Exception { + void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) throws Exception { Map ingestMetadata = ingestDocument.getIngestMetadata(); try { ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage()); - ingestMetadata.put(ON_FAILURE_PROCESSOR_FIELD, failedProcessorType); + ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType); + ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag); for (Processor processor : onFailureProcessors) { processor.execute(ingestDocument); } } finally { ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD); - ingestMetadata.remove(ON_FAILURE_PROCESSOR_FIELD); + ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD); + ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java index f21644e6005..7bc8922af41 100644 --- a/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java @@ -80,9 +80,10 @@ public class CompoundProcessorTests extends ESTestCase { TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor2 = new TestProcessor(ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); - assertThat(ingestMetadata.size(), equalTo(2)); + assertThat(ingestMetadata.size(), equalTo(3)); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); - assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); }); CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2)); @@ -94,18 +95,20 @@ public class CompoundProcessorTests extends ESTestCase { public void testSingleProcessorWithNestedFailures() throws Exception { TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");}); - TestProcessor processorToFail = new TestProcessor("id", "second", ingestDocument -> { + TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); - assertThat(ingestMetadata.size(), equalTo(2)); + assertThat(ingestMetadata.size(), equalTo(3)); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); - assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); throw new RuntimeException("error"); }); TestProcessor lastProcessor = new TestProcessor(ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); - assertThat(ingestMetadata.size(), equalTo(2)); + assertThat(ingestMetadata.size(), equalTo(3)); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); - assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2")); }); CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(Collections.singletonList(processorToFail), Collections.singletonList(lastProcessor)); CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor)); diff --git a/docs/reference/ingest/ingest.asciidoc b/docs/reference/ingest/ingest.asciidoc index e1ce35eb23c..dfe377ecac4 100644 --- a/docs/reference/ingest/ingest.asciidoc +++ b/docs/reference/ingest/ingest.asciidoc @@ -725,7 +725,7 @@ the index for which failed documents get sent. Sometimes you may want to retrieve the actual error message that was thrown by a failed processor. To do so you can access metadata fields called -`on_failure_message` and `on_failure_processor`. These fields are only accessible +`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag`. These fields are only accessible from within the context of an `on_failure` block. Here is an updated version of our first example which leverages these fields to provide the error message instead of manually setting it. diff --git a/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml b/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml index 9e644773c6a..491e1dae292 100644 --- a/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml +++ b/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml @@ -185,12 +185,13 @@ "processors": [ { "remove" : { + "tag" : "first_processor", "field" : "field_to_remove", "on_failure" : [ { "set" : { "field" : "error", - "value" : "processor [{{ _ingest.on_failure_processor }}]: {{ _ingest.on_failure_message }}" + "value" : "processor {{ _ingest.on_failure_processor_tag }} [{{ _ingest.on_failure_processor_type }}]: {{ _ingest.on_failure_message }}" } } ] @@ -217,4 +218,4 @@ id: 1 - length: { _source: 2 } - match: { _source.do_nothing: "foo" } - - match: { _source.error: "processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" } + - match: { _source.error: "processor first_processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" }