From 162f02b12fd7b6d1a78efab09942ceffea49e07d Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Sat, 28 Feb 2015 21:24:25 -0500 Subject: [PATCH] Removing the separate reads for validation preferring to do the read once and handle any exceptions. --- .../standard/AbstractJsonPathProcessor.java | 36 ++++++++----------- .../processors/standard/EvaluateJsonPath.java | 15 ++++---- .../nifi/processors/standard/SplitJson.java | 9 ++--- 3 files changed, 26 insertions(+), 34 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java index 70efe38fc4..02547f3aba 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java @@ -20,7 +20,9 @@ import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.internal.spi.json.JsonSmartJsonProvider; import com.jayway.jsonpath.spi.json.JsonProvider; +import net.minidev.json.parser.JSONParser; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; @@ -28,9 +30,7 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processors.standard.util.JsonUtils; import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.util.BooleanHolder; import org.apache.nifi.util.ObjectHolder; import java.io.IOException; @@ -46,7 +46,10 @@ import java.util.Map; */ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { - protected static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider(); + private static final Configuration STRICT_PROVIDER_CONFIGURATION = + Configuration.builder().jsonProvider(new JsonSmartJsonProvider(JSONParser.MODE_RFC4627)).build(); + + private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider(); public static final Validator JSON_PATH_VALIDATOR = new Validator() { @Override @@ -57,35 +60,23 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { } catch (InvalidPathException ipe) { error = ipe.toString(); } - return new ValidationResult.Builder().subject("JsonPath expression " + subject).valid(error == null).explanation(error).build(); + return new ValidationResult.Builder().subject(subject).valid(error == null).explanation(error).build(); } }; static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) { - - final BooleanHolder validJsonHolder = new BooleanHolder(false); + // Parse the document once into an associated context to support multiple path evaluations if specified + final ObjectHolder contextHolder = new ObjectHolder<>(null); processSession.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { - validJsonHolder.set(JsonUtils.isValidJson(in)); + try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) { + DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(bufferedInputStream); + contextHolder.set(ctx); + } } }); - // Parse the document once into an associated context to support multiple path evaluations if specified - final ObjectHolder contextHolder = new ObjectHolder<>(null); - - if (validJsonHolder.get()) { - processSession.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) { - DocumentContext ctx = JsonPath.parse(in); - contextHolder.set(ctx); - } - } - }); - } - return contextHolder.get(); } @@ -107,4 +98,5 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { } return JSON_PROVIDER.toJson(jsonPathResult); } + } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index ed0f07c12d..1b89dee840 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.InvalidJsonException; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; import org.apache.nifi.annotation.behavior.EventDriven; @@ -177,9 +178,10 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR; } - final DocumentContext documentContext = validateAndEstablishJsonContext(processSession, flowFile); - - if (documentContext == null) { + DocumentContext documentContext = null; + try { + documentContext = validateAndEstablishJsonContext(processSession, flowFile); + } catch (InvalidJsonException e) { logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile}); processSession.transfer(flowFile, REL_FAILURE); return; @@ -187,7 +189,6 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { final Map jsonPathResults = new HashMap<>(); - jsonPathEvalLoop: for (final Map.Entry attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) { String jsonPathAttrKey = attributeJsonPathEntry.getKey(); @@ -207,7 +208,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e); if (destination.equals(DESTINATION_ATTRIBUTE)) { jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY); - continue jsonPathEvalLoop; + continue; } else { processSession.transfer(flowFile, REL_NO_MATCH); return; @@ -235,6 +236,4 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { flowFile = processSession.putAllAttributes(flowFile, jsonPathResults); processSession.transfer(flowFile, REL_MATCH); } - - -} +} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index fe2216a7b6..5177bddadc 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.InvalidJsonException; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; import org.apache.nifi.annotation.behavior.EventDriven; @@ -94,10 +95,10 @@ public class SplitJson extends AbstractJsonPathProcessor { final ProcessorLog logger = getLogger(); - - final DocumentContext documentContext = validateAndEstablishJsonContext(processSession, original); - - if (documentContext == null) { + DocumentContext documentContext = null; + try { + documentContext = validateAndEstablishJsonContext(processSession, original); + } catch (InvalidJsonException e) { logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original}); processSession.transfer(original, REL_FAILURE); return;