diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index d79a6dec5b..65266fffd3 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -151,8 +151,8 @@ public class EvaluateJsonPath extends AbstractProcessor { @Override public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException { - List flowFiles = processSession.get(50); - if (flowFiles.isEmpty()) { + FlowFile flowFile = processSession.get(); + if (flowFile == null) { return; } @@ -175,66 +175,61 @@ public class EvaluateJsonPath extends AbstractProcessor { returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR; } - flowFileLoop: - for (FlowFile flowFile : flowFiles) { + final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, flowFile); - final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, flowFile); - - if (documentContext == null) { - logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile}); - processSession.transfer(flowFile, REL_FAILURE); - continue flowFileLoop; - } - - final Map jsonPathResults = new HashMap<>(); - - jsonPathEvalLoop: - for (final Map.Entry attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) { - - String jsonPathAttrKey = attributeJsonPathEntry.getKey(); - JsonPath jsonPathExp = attributeJsonPathEntry.getValue(); - - final ObjectHolder resultHolder = new ObjectHolder<>(null); - try { - Object result = documentContext.read(jsonPathExp); - if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) { - logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.", - new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()}); - processSession.transfer(flowFile, REL_FAILURE); - continue flowFileLoop; - } - resultHolder.set(result); - } catch (PathNotFoundException e) { - logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e); - if (destination.equals(DESTINATION_ATTRIBUTE)) { - jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY); - continue jsonPathEvalLoop; - } else { - processSession.transfer(flowFile, REL_NO_MATCH); - continue flowFileLoop; - } - } - - final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get()); - switch (destination) { - case DESTINATION_ATTRIBUTE: - jsonPathResults.put(jsonPathAttrKey, resultRepresentation); - break; - case DESTINATION_CONTENT: - flowFile = processSession.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try (OutputStream outputStream = new BufferedOutputStream(out)) { - outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8)); - } - } - }); - break; - } - } - flowFile = processSession.putAllAttributes(flowFile, jsonPathResults); - processSession.transfer(flowFile, REL_MATCH); + if (documentContext == null) { + logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile}); + processSession.transfer(flowFile, REL_FAILURE); + return; } + + final Map jsonPathResults = new HashMap<>(); + + jsonPathEvalLoop: + for (final Map.Entry attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) { + + String jsonPathAttrKey = attributeJsonPathEntry.getKey(); + JsonPath jsonPathExp = attributeJsonPathEntry.getValue(); + + final ObjectHolder resultHolder = new ObjectHolder<>(null); + try { + Object result = documentContext.read(jsonPathExp); + if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) { + logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.", + new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()}); + processSession.transfer(flowFile, REL_FAILURE); + return; + } + resultHolder.set(result); + } catch (PathNotFoundException e) { + logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e); + if (destination.equals(DESTINATION_ATTRIBUTE)) { + jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY); + continue jsonPathEvalLoop; + } else { + processSession.transfer(flowFile, REL_NO_MATCH); + return; + } + } + + final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get()); + switch (destination) { + case DESTINATION_ATTRIBUTE: + jsonPathResults.put(jsonPathAttrKey, resultRepresentation); + case DESTINATION_CONTENT: + flowFile = processSession.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8)); + } + } + }); + break; + } + } + flowFile = processSession.putAllAttributes(flowFile, jsonPathResults); + processSession.transfer(flowFile, REL_MATCH); } }