NIFI-4278 - add error message to invalid FFs in ValidateXml

This closes #2069.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Pierre Villard 2017-08-09 12:34:24 +02:00 committed by Andy LoPresto
parent 9f1267e949
commit 6ff8321cf7
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
2 changed files with 27 additions and 5 deletions

View File

@ -37,6 +37,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -57,9 +59,15 @@ import org.xml.sax.SAXException;
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"xml", "schema", "validation", "xsd"}) @Tags({"xml", "schema", "validation", "xsd"})
@WritesAttributes({
@WritesAttribute(attribute = "validatexml.invalid.error", description = "If the flow file is routed to the invalid relationship "
+ "the attribute will contain the error message resulting from the validation failure.")
})
@CapabilityDescription("Validates the contents of FlowFiles against a user-specified XML Schema file") @CapabilityDescription("Validates the contents of FlowFiles against a user-specified XML Schema file")
public class ValidateXml extends AbstractProcessor { public class ValidateXml extends AbstractProcessor {
public static final String ERROR_ATTRIBUTE_KEY = "validatexml.invalid.error";
public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
.name("Schema File") .name("Schema File")
.description("The path to the Schema file that is to be used for validation") .description("The path to the Schema file that is to be used for validation")
@ -127,8 +135,10 @@ public class ValidateXml extends AbstractProcessor {
final Validator validator = schema.newValidator(); final Validator validator = schema.newValidator();
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
for (final FlowFile flowFile : flowFiles) { for (FlowFile flowFile : flowFiles) {
final AtomicBoolean valid = new AtomicBoolean(true); final AtomicBoolean valid = new AtomicBoolean(true);
final AtomicReference<Exception> exception = new AtomicReference<Exception>(null);
session.read(flowFile, new InputStreamCallback() { session.read(flowFile, new InputStreamCallback() {
@Override @Override
public void process(final InputStream in) throws IOException { public void process(final InputStream in) throws IOException {
@ -136,17 +146,18 @@ public class ValidateXml extends AbstractProcessor {
validator.validate(new StreamSource(in)); validator.validate(new StreamSource(in));
} catch (final IllegalArgumentException | SAXException e) { } catch (final IllegalArgumentException | SAXException e) {
valid.set(false); valid.set(false);
logger.debug("Failed to validate {} against schema due to {}", new Object[]{flowFile, e}); exception.set(e);
} }
} }
}); });
if (valid.get()) { if (valid.get()) {
logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile}); logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
session.getProvenanceReporter().route(flowFile, REL_VALID); session.getProvenanceReporter().route(flowFile, REL_VALID);
session.transfer(flowFile, REL_VALID); session.transfer(flowFile, REL_VALID);
} else { } else {
logger.info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile}); flowFile = session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY, exception.get().getLocalizedMessage());
logger.info("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile, exception.get().getLocalizedMessage()});
session.getProvenanceReporter().route(flowFile, REL_INVALID); session.getProvenanceReporter().route(flowFile, REL_INVALID);
session.transfer(flowFile, REL_INVALID); session.transfer(flowFile, REL_INVALID);
} }

View File

@ -21,7 +21,6 @@ import java.nio.file.Paths;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Test; import org.junit.Test;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
@ -38,4 +37,16 @@ public class TestValidateXml {
runner.assertAllFlowFilesTransferred(ValidateXml.REL_VALID, 1); runner.assertAllFlowFilesTransferred(ValidateXml.REL_VALID, 1);
} }
@Test
public void testInvalid() throws IOException, SAXException {
final TestRunner runner = TestRunners.newTestRunner(new ValidateXml());
runner.setProperty(ValidateXml.SCHEMA_FILE, "src/test/resources/TestXml/XmlBundle.xsd");
runner.enqueue("<this>is an invalid</xml>");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateXml.REL_INVALID, 1);
runner.assertAllFlowFilesContainAttribute(ValidateXml.REL_INVALID, ValidateXml.ERROR_ATTRIBUTE_KEY);
}
} }