diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java index 8ccea5a898..1ea70e2081 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java @@ -174,6 +174,15 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor { }); } catch (final Exception e) { getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e}); + // Since we are wrapping the exceptions above there should always be a cause + // but it's possible it might not have a message. This handles that by logging + // the name of the class thrown. + Throwable c = e.getCause(); + if (c != null) { + session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown"); + } else { + session.putAttribute(flowFile, "record.error.message", e.getClass().getCanonicalName() + " Thrown"); + } session.transfer(flowFile, REL_FAILURE); return; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java index 1be1794217..a1e6f99782 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java @@ -41,7 +41,8 @@ import java.util.List; @Tags({"convert", "record", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"}) @WritesAttributes({ @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), - @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile") + @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile"), + @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.") }) @CapabilityDescription("Converts records from one data format to another using configured Record Reader and Record Write Controller Services. " + "The Reader and Writer must be configured with \"matching\" schemas. By this, we mean the schemas must have the same field names. The types of the fields " diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java index 8ee5f433c3..65abdc9dc9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -67,7 +68,10 @@ import java.util.stream.Stream; + "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should " + "be updated. The value of the Property is either a replacement value (optionally making use of the Expression Language) or is itself a RecordPath that extracts a value from " + "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the Property.") -@WritesAttribute(attribute = "record.index", description = "This attribute provides the current row index and is only available inside the literal value expression.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.index", description = "This attribute provides the current row index and is only available inside the literal value expression."), + @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.") +}) @SeeAlso({ConvertRecord.class}) public class UpdateRecord extends AbstractRecordProcessor { private static final String FIELD_NAME = "field.name"; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java index 822f664aed..7870a040a2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java @@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -120,7 +119,7 @@ public class TestConvertRecord { } @Test - public void testReadFailure() throws InitializationException { + public void testReadFailure() throws InitializationException, IOException { final MockRecordParser readerService = new MockRecordParser(2); final MockRecordWriter writerService = new MockRecordWriter("header", false); @@ -146,12 +145,13 @@ public class TestConvertRecord { // Original FlowFile should be routed to 'failure' relationship without modification runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0); - assertTrue(original == out); + out.assertContentEquals(original.toByteArray()); + out.assertAttributeEquals("record.error.message","Intentional Unit Test Exception because 2 records have been read"); } @Test - public void testWriteFailure() throws InitializationException { + public void testWriteFailure() throws InitializationException, IOException { final MockRecordParser readerService = new MockRecordParser(); final MockRecordWriter writerService = new MockRecordWriter("header", false, 2); @@ -177,7 +177,8 @@ public class TestConvertRecord { // Original FlowFile should be routed to 'failure' relationship without modification runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0); - assertTrue(original == out); + out.assertContentEquals(original.toByteArray()); + out.assertAttributeEquals("record.error.message","Unit Test intentionally throwing IOException after 2 records were written"); } @Test