NIFI-7139 Add record.error.message on failure of a record reader or writer

Handle scenario where message might be null.

Update to test case that was failing because adding attributes modified a flow file even if you don't change the contents.

Fixed Style Issues and Updated WritesAttributes.

Added Test Case for Error Message

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4052
This commit is contained in:
Shawn Weeks 2020-02-13 08:39:49 -06:00 committed by Matthew Burgess
parent bad0f10a52
commit 0bb8ce7438
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 22 additions and 7 deletions

View File

@ -174,6 +174,15 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
});
} catch (final Exception e) {
getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e});
// Since we are wrapping the exceptions above there should always be a cause
// but it's possible it might not have a message. This handles that by logging
// the name of the class thrown.
Throwable c = e.getCause();
if (c != null) {
session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
} else {
session.putAttribute(flowFile, "record.error.message", e.getClass().getCanonicalName() + " Thrown");
}
session.transfer(flowFile, REL_FAILURE);
return;
}

View File

@ -41,7 +41,8 @@ import java.util.List;
@Tags({"convert", "record", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile"),
@WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
})
@CapabilityDescription("Converts records from one data format to another using configured Record Reader and Record Write Controller Services. "
+ "The Reader and Writer must be configured with \"matching\" schemas. By this, we mean the schemas must have the same field names. The types of the fields "

View File

@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -67,7 +68,10 @@ import java.util.stream.Stream;
+ "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should "
+ "be updated. The value of the Property is either a replacement value (optionally making use of the Expression Language) or is itself a RecordPath that extracts a value from "
+ "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the <Replacement Value Strategy> Property.")
@WritesAttribute(attribute = "record.index", description = "This attribute provides the current row index and is only available inside the literal value expression.")
@WritesAttributes({
@WritesAttribute(attribute = "record.index", description = "This attribute provides the current row index and is only available inside the literal value expression."),
@WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
})
@SeeAlso({ConvertRecord.class})
public class UpdateRecord extends AbstractRecordProcessor {
private static final String FIELD_NAME = "field.name";

View File

@ -18,7 +18,6 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -120,7 +119,7 @@ public class TestConvertRecord {
}
@Test
public void testReadFailure() throws InitializationException {
public void testReadFailure() throws InitializationException, IOException {
final MockRecordParser readerService = new MockRecordParser(2);
final MockRecordWriter writerService = new MockRecordWriter("header", false);
@ -146,12 +145,13 @@ public class TestConvertRecord {
// Original FlowFile should be routed to 'failure' relationship without modification
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
assertTrue(original == out);
out.assertContentEquals(original.toByteArray());
out.assertAttributeEquals("record.error.message","Intentional Unit Test Exception because 2 records have been read");
}
@Test
public void testWriteFailure() throws InitializationException {
public void testWriteFailure() throws InitializationException, IOException {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false, 2);
@ -177,7 +177,8 @@ public class TestConvertRecord {
// Original FlowFile should be routed to 'failure' relationship without modification
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
assertTrue(original == out);
out.assertContentEquals(original.toByteArray());
out.assertAttributeEquals("record.error.message","Unit Test intentionally throwing IOException after 2 records were written");
}
@Test