Providing validation of the input FlowFile as JSON

This commit is contained in:
Aldrin Piri 2015-02-16 10:56:06 -05:00
parent 40da65f193
commit c3c4d36944
2 changed files with 62 additions and 8 deletions

View File

@ -16,9 +16,9 @@
*/
package org.apache.nifi.processors.standard;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.*;
import com.jayway.jsonpath.spi.json.JsonProvider;
import net.minidev.json.JSONValue;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -32,10 +32,11 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.util.ObjectHolder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.io.InputStreamReader;
import java.util.*;
@EventDriven
@ -63,6 +64,8 @@ public class EvaluateJsonPath extends AbstractProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
@Override
protected void init(final ProcessorInitializationContext context) {
@ -122,19 +125,56 @@ public class EvaluateJsonPath extends AbstractProcessor {
}
@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
final FlowFile flowFile = processSession.get();
if (flowFile == null) {
return;
}
// Determine the destination
final String destination = processContext.getProperty(DESTINATION).getValue();
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
// Parse the document once to support multiple path evaluations if specified
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
// Parse the document once to support multiple path evaluations if specified
Object document = Configuration.defaultConfiguration().jsonProvider().parse(in, StandardCharsets.UTF_8.displayName());
/*
* JSONValue#isValidJson is permissive to the degree of the Smart JSON definition.
* Accordingly, a strict JSON approach is preferred in determining whether or not a document is valid.
*/
boolean validJson = JSONValue.isValidJsonStrict(new InputStreamReader(in));
if (validJson) {
DocumentContext ctx = JsonPath.parse(in);
contextHolder.set(ctx);
} else {
getLogger().error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile.getId()});
processSession.transfer(flowFile, REL_FAILURE);
}
}
});
DocumentContext documentContext = contextHolder.get();
if (documentContext == null) {
return;
}
try {
switch (destination) {
case DESTINATION_ATTRIBUTE:
break;
case DESTINATION_CONTENT:
break;
}
processSession.transfer(flowFile, REL_MATCH);
} catch (PathNotFoundException e) {
getLogger().warn("FlowFile {} could not be read from.", new Object[]{flowFile.getId()}, e);
processSession.transfer(flowFile, REL_NO_MATCH);
}
}
private static class JsonPathValidator implements Validator {
@ -149,4 +189,5 @@ public class EvaluateJsonPath extends AbstractProcessor {
return new ValidationResult.Builder().valid(error == null).explanation(error).build();
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
@ -27,6 +28,7 @@ import java.nio.file.Paths;
public class TestEvaluateJsonPath {
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
private static final Path XML_SNIPPET = Paths.get("src/test/resources/TestXml/xml-snippet.xml");
@Test(expected = AssertionError.class)
public void testInvalidJsonPath() {
@ -37,4 +39,15 @@ public class TestEvaluateJsonPath {
Assert.fail("An improper JsonPath expression was not detected as being invalid.");
}
@Test
public void testInvalidJsonDocument() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
testRunner.enqueue(XML_SNIPPET);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(EvaluateJsonPath.REL_FAILURE, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_FAILURE).get(0);
}
}