mirror of https://github.com/apache/nifi.git
Refining logic of how errors are handled on a per destination basis. Adding supporting tests to ensure contract is met.
This commit is contained in:
parent
5b145e10e8
commit
bcebba6632
|
@ -50,15 +50,23 @@ import java.util.*;
|
||||||
@SideEffectFree
|
@SideEffectFree
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@Tags({"JSON", "evaluate", "JsonPath"})
|
@Tags({"JSON", "evaluate", "JsonPath"})
|
||||||
@CapabilityDescription("")
|
@CapabilityDescription("Evaluates one or more JsonPath expressions against the content of a FlowFile. The results of those expressions are assigned to "
|
||||||
|
+ "FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of the "
|
||||||
|
+ "Processor. JsonPaths are entered by adding user-defined properties; the name of the property maps to the Attribute "
|
||||||
|
+ "Name into which the result will be placed (if the Destination is flowfile-attribute; otherwise, the property name is ignored). "
|
||||||
|
+ "The value of the property must be a valid JsonPath expression. If the JsonPath evaluates to a JSON array or JSON object and the Return Type is "
|
||||||
|
+ "set to 'scalar' the FlowFile will be unmodified and will be routed to failure. If the JsonPath does not "
|
||||||
|
+ "evaluate to a scalar, the FlowFile will be routed to 'unmatched' without having its contents modified. If Destination is "
|
||||||
|
+ "flowfile-attribute and the expression matches nothing, attributes will be created with empty strings as the value, and the "
|
||||||
|
+ "FlowFile will always be routed to 'matched.' If Destination is 'flowfile-content' and the expression matches nothing, "
|
||||||
|
+ "the FlowFile will be routed to 'unmatched' without having its contents modified.")
|
||||||
public class EvaluateJsonPath extends AbstractProcessor {
|
public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
|
|
||||||
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
|
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
|
||||||
public static final String DESTINATION_CONTENT = "flowfile-content";
|
public static final String DESTINATION_CONTENT = "flowfile-content";
|
||||||
|
|
||||||
public static final String RETURN_TYPE_AUTO = "auto-detect";
|
|
||||||
public static final String RETURN_TYPE_JSON = "json";
|
public static final String RETURN_TYPE_JSON = "json";
|
||||||
public static final String RETURN_TYPE_STRING = "string";
|
public static final String RETURN_TYPE_SCALAR = "scalar";
|
||||||
|
|
||||||
public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
|
||||||
.name("Destination")
|
.name("Destination")
|
||||||
|
@ -72,8 +80,8 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
.name("Return Type")
|
.name("Return Type")
|
||||||
.description("Indicates the desired return type of the JSON Path expressions. Selecting 'auto-detect' will set the return type to 'json' for a Destination of 'flowfile-content', and 'string' for a Destination of 'flowfile-attribute'.")
|
.description("Indicates the desired return type of the JSON Path expressions. Selecting 'auto-detect' will set the return type to 'json' for a Destination of 'flowfile-content', and 'string' for a Destination of 'flowfile-attribute'.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.allowableValues(RETURN_TYPE_AUTO, RETURN_TYPE_JSON, RETURN_TYPE_STRING)
|
.allowableValues(RETURN_TYPE_JSON, RETURN_TYPE_SCALAR)
|
||||||
.defaultValue(RETURN_TYPE_AUTO)
|
.defaultValue(RETURN_TYPE_JSON)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles are routed to this relationship when the JsonPath is successfully evaluated and the FlowFile is modified as a result").build();
|
public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles are routed to this relationship when the JsonPath is successfully evaluated and the FlowFile is modified as a result").build();
|
||||||
|
@ -208,30 +216,44 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
final Map<String, String> jsonPathResults = new HashMap<>();
|
final Map<String, String> jsonPathResults = new HashMap<>();
|
||||||
|
|
||||||
// Iterate through all JsonPath entries specified
|
// Iterate through all JsonPath entries specified
|
||||||
|
jsonPathEvalLoop:
|
||||||
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
|
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
|
||||||
|
|
||||||
String jsonPathAttrKey = attributeJsonPathEntry.getKey();
|
String jsonPathAttrKey = attributeJsonPathEntry.getKey();
|
||||||
JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
|
JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
|
||||||
|
|
||||||
|
|
||||||
final ObjectHolder<String> resultHolder = new ObjectHolder<>("");
|
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
|
||||||
try {
|
try {
|
||||||
resultHolder.set(evaluatePathForContext(jsonPathExp, documentContext));
|
Object result = documentContext.read(jsonPathExp);
|
||||||
|
if (RETURN_TYPE.getName().equals(RETURN_TYPE_SCALAR) && !isScalar(result)) {
|
||||||
|
logger.error("Unable to return a scalar value for a JsonPath {} for FlowFile {}. Transferring to {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), REL_FAILURE.getName()});
|
||||||
|
processSession.transfer(flowFile, REL_FAILURE);
|
||||||
|
continue flowFileLoop;
|
||||||
|
}
|
||||||
|
resultHolder.set(result);
|
||||||
} catch (PathNotFoundException e) {
|
} catch (PathNotFoundException e) {
|
||||||
logger.error("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
|
logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
|
||||||
|
if (destination.equals(DESTINATION_ATTRIBUTE)) {
|
||||||
jsonPathResults.put(jsonPathAttrKey, "");
|
jsonPathResults.put(jsonPathAttrKey, "");
|
||||||
|
continue jsonPathEvalLoop;
|
||||||
|
} else {
|
||||||
|
processSession.transfer(flowFile, REL_NO_MATCH);
|
||||||
|
continue flowFileLoop;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String resultRepresentation = getResultRepresentation(resultHolder.get());
|
||||||
switch (destination) {
|
switch (destination) {
|
||||||
case DESTINATION_ATTRIBUTE:
|
case DESTINATION_ATTRIBUTE:
|
||||||
jsonPathResults.put(jsonPathAttrKey, resultHolder.get());
|
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
|
||||||
break;
|
break;
|
||||||
case DESTINATION_CONTENT:
|
case DESTINATION_CONTENT:
|
||||||
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
|
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(final OutputStream out) throws IOException {
|
public void process(final OutputStream out) throws IOException {
|
||||||
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
||||||
outputStream.write(resultHolder.get().getBytes(StandardCharsets.UTF_8));
|
outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -243,16 +265,18 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String evaluatePathForContext(JsonPath path, ReadContext readCtx) {
|
private static String getResultRepresentation(Object jsonPathResult) {
|
||||||
Object pathResult = readCtx.read(path);
|
if (isScalar(jsonPathResult)) {
|
||||||
/*
|
return jsonPathResult.toString();
|
||||||
* A given path could be a JSON object or a single value, if a sole value, treat as a String; otherwise, return the
|
|
||||||
* representative JSON.
|
|
||||||
*/
|
|
||||||
if (pathResult instanceof String) {
|
|
||||||
return pathResult.toString();
|
|
||||||
}
|
}
|
||||||
return JSON_PROVIDER.toJson(pathResult);
|
return JSON_PROVIDER.toJson(jsonPathResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isScalar(Object obj) {
|
||||||
|
/*
|
||||||
|
* A given path could be a JSON object or a single/scalar value
|
||||||
|
*/
|
||||||
|
return (obj instanceof String);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class JsonPathValidator implements Validator {
|
private static class JsonPathValidator implements Validator {
|
||||||
|
|
|
@ -180,7 +180,7 @@ public class TestEvaluateJsonPath {
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExtractPath_destinationAttribute_indefiniteResult() throws Exception {
|
public void testExtractPath_destinationContent_indefiniteResult() throws Exception {
|
||||||
String jsonPathAttrKey = "friends.indefinite.id.list";
|
String jsonPathAttrKey = "friends.indefinite.id.list";
|
||||||
|
|
||||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||||
|
@ -197,7 +197,7 @@ public class TestEvaluateJsonPath {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExtractPath_destinationAttribute_indefiniteResult_operators() throws Exception {
|
public void testExtractPath_destinationContent_indefiniteResult_operators() throws Exception {
|
||||||
String jsonPathAttrKey = "friends.indefinite.id.list";
|
String jsonPathAttrKey = "friends.indefinite.id.list";
|
||||||
|
|
||||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||||
|
@ -212,4 +212,20 @@ public class TestEvaluateJsonPath {
|
||||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||||
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals("[0,1,2]");
|
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals("[0,1,2]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRouteUnmatched_destinationContent_noMatch() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||||
|
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_CONTENT);
|
||||||
|
testRunner.setProperty("jsonPath", "$[0].nonexistent.key");
|
||||||
|
|
||||||
|
testRunner.enqueue(JSON_SNIPPET);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
Relationship expectedRel = EvaluateJsonPath.REL_NO_MATCH;
|
||||||
|
|
||||||
|
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||||
|
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals(JSON_SNIPPET);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue