Completing initial functionality of EvaluateJsonPath and associated tests.

This commit is contained in:
Aldrin Piri 2015-02-16 18:03:24 -05:00
parent c3c4d36944
commit b3328490c6
2 changed files with 156 additions and 37 deletions

View File

@ -29,14 +29,21 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.BooleanHolder;
import org.apache.nifi.util.ObjectHolder;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
@EventDriven
@ -49,6 +56,10 @@ public class EvaluateJsonPath extends AbstractProcessor {
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
public static final String DESTINATION_CONTENT = "flowfile-content";
public static final String RETURN_TYPE_AUTO = "auto-detect";
public static final String RETURN_TYPE_JSON = "json";
public static final String RETURN_TYPE_STRING = "string";
public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.name("Destination")
.description("Indicates whether the results of the JsonPath evaluation are written to the FlowFile content or a FlowFile attribute; if using attribute, must specify the Attribute Name property. If set to flowfile-content, only one JsonPath may be specified, and the property name is ignored.")
@ -57,6 +68,14 @@ public class EvaluateJsonPath extends AbstractProcessor {
.defaultValue(DESTINATION_CONTENT)
.build();
public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder()
.name("Return Type")
.description("Indicates the desired return type of the JSON Path expressions. Selecting 'auto-detect' will set the return type to 'json' for a Destination of 'flowfile-content', and 'string' for a Destination of 'flowfile-attribute'.")
.required(true)
.allowableValues(RETURN_TYPE_AUTO, RETURN_TYPE_AUTO, RETURN_TYPE_STRING)
.defaultValue(RETURN_TYPE_AUTO)
.build();
public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles are routed to this relationship when the JsonPath is successfully evaluated and the FlowFile is modified as a result").build();
public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched").description("FlowFiles are routed to this relationship when the JsonPath does not match the content of the FlowFile and the Destination is set to flowfile-content").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles are routed to this relationship when the JsonPath cannot be evaluated against the content of the FlowFile; for instance, if the FlowFile is not valid JSON").build();
@ -77,6 +96,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DESTINATION);
properties.add(RETURN_TYPE);
this.properties = Collections.unmodifiableList(properties);
}
@ -127,55 +147,110 @@ public class EvaluateJsonPath extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
final FlowFile flowFile = processSession.get();
if (flowFile == null) {
List<FlowFile> flowFiles = processSession.get(50);
if (flowFiles.isEmpty()) {
return;
}
// Determine the destination
final ProcessorLog logger = getLogger();
/* Build the JsonPath expressions from attributes */
final Map<String, JsonPath> attributeToJsonPathMap = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : processContext.getProperties().entrySet()) {
if (!entry.getKey().isDynamic()) {
continue;
}
final JsonPath jsonPath = JsonPath.compile(entry.getValue());
attributeToJsonPathMap.put(entry.getKey().getName(), jsonPath);
}
final String destination = processContext.getProperty(DESTINATION).getValue();
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
// Parse the document once to support multiple path evaluations if specified
flowFileLoop:
for (FlowFile flowFile : flowFiles) {
// Validate the JSON document before attempting processing
final BooleanHolder validJsonHolder = new BooleanHolder(false);
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (InputStreamReader inputStreamReader = new InputStreamReader(in)) {
/*
* JSONValue#isValidJson is permissive to the degree of the Smart JSON definition.
* Accordingly, a strict JSON approach is preferred in determining whether or not a document is valid.
*/
boolean validJson = JSONValue.isValidJsonStrict(new InputStreamReader(in));
if (validJson) {
DocumentContext ctx = JsonPath.parse(in);
contextHolder.set(ctx);
} else {
getLogger().error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile.getId()});
processSession.transfer(flowFile, REL_FAILURE);
boolean validJson = JSONValue.isValidJsonStrict(inputStreamReader);
validJsonHolder.set(validJson);
}
}
});
DocumentContext documentContext = contextHolder.get();
if (documentContext == null) {
return;
if (!validJsonHolder.get()) {
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile.getId()});
processSession.transfer(flowFile, REL_FAILURE);
continue flowFileLoop;
}
// Parse the document once into an associated context to support multiple path evaluations if specified
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) {
DocumentContext ctx = JsonPath.parse(in);
contextHolder.set(ctx);
}
}
});
final DocumentContext documentContext = contextHolder.get();
final Map<String, String> jsonPathResults = new HashMap<>();
// Iterate through all JsonPath entries specified
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
String jsonPathAttrKey = attributeJsonPathEntry.getKey();
JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
final String evalResult = evaluatePathForContext(jsonPathExp, documentContext);
try {
switch (destination) {
case DESTINATION_ATTRIBUTE:
jsonPathResults.put(jsonPathAttrKey, evalResult);
break;
case DESTINATION_CONTENT:
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(evalResult.getBytes(StandardCharsets.UTF_8));
}
}
});
break;
}
processSession.transfer(flowFile, REL_MATCH);
} catch (PathNotFoundException e) {
getLogger().warn("FlowFile {} could not be read from.", new Object[]{flowFile.getId()}, e);
processSession.transfer(flowFile, REL_NO_MATCH);
logger.error("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
jsonPathResults.put(jsonPathAttrKey, "");
}
}
flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
processSession.transfer(flowFile, REL_MATCH);
}
}
private static String evaluatePathForContext(JsonPath path, ReadContext readCtx) {
Object pathResult = readCtx.read(path);
/*
* A given path could be a JSON object or a single value, if a sole value, treat as a String; otherwise, return the
* representative JSON.
*/
if (pathResult instanceof String) {
return pathResult.toString();
}
return JSON_PROVIDER.toJson(pathResult);
}
private static class JsonPathValidator implements Validator {
@Override

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -50,4 +51,47 @@ public class TestEvaluateJsonPath {
testRunner.assertAllFlowFilesTransferred(EvaluateJsonPath.REL_FAILURE, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_FAILURE).get(0);
}
@Test(expected = AssertionError.class)
public void testInvalidConfiguration_destinationContent_twoPaths() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_CONTENT);
testRunner.setProperty("JsonPath1", "$[0]._id");
testRunner.setProperty("JsonPath2", "$[0].name");
testRunner.enqueue(JSON_SNIPPET);
testRunner.run();
Assert.fail("Processor incorrectly ran with an invalid configuration of multiple paths specified as attributes for a destination of content.");
}
@Test
public void testConfiguration_destinationAttributes_twoPaths() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
testRunner.setProperty("JsonPath1", "$[0]._id");
testRunner.setProperty("JsonPath2", "$[0].name");
testRunner.enqueue(JSON_SNIPPET);
testRunner.run();
}
@Test
public void testExtractPath_destinationAttribute() throws Exception {
String jsonPathAttrKey = "JsonPath";
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
testRunner.setProperty(jsonPathAttrKey, "$[0]._id");
testRunner.enqueue(JSON_SNIPPET);
testRunner.run();
Relationship expectedRel = EvaluateJsonPath.REL_MATCH;
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
Assert.assertEquals("Transferred flow file did not have the correct result", "54df94072d5dbf7dc6340cc5", out.getAttribute(jsonPathAttrKey));
}
}