mirror of https://github.com/apache/nifi.git
Removing the separate reads for validation preferring to do the read once and handle any exceptions.
This commit is contained in:
parent
57aa5dd63f
commit
162f02b12f
|
@ -20,7 +20,9 @@ import com.jayway.jsonpath.Configuration;
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
import com.jayway.jsonpath.InvalidPathException;
|
import com.jayway.jsonpath.InvalidPathException;
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
|
import com.jayway.jsonpath.internal.spi.json.JsonSmartJsonProvider;
|
||||||
import com.jayway.jsonpath.spi.json.JsonProvider;
|
import com.jayway.jsonpath.spi.json.JsonProvider;
|
||||||
|
import net.minidev.json.parser.JSONParser;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.components.Validator;
|
import org.apache.nifi.components.Validator;
|
||||||
|
@ -28,9 +30,7 @@ import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
import org.apache.nifi.processors.standard.util.JsonUtils;
|
|
||||||
import org.apache.nifi.stream.io.BufferedInputStream;
|
import org.apache.nifi.stream.io.BufferedInputStream;
|
||||||
import org.apache.nifi.util.BooleanHolder;
|
|
||||||
import org.apache.nifi.util.ObjectHolder;
|
import org.apache.nifi.util.ObjectHolder;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -46,7 +46,10 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
protected static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
|
private static final Configuration STRICT_PROVIDER_CONFIGURATION =
|
||||||
|
Configuration.builder().jsonProvider(new JsonSmartJsonProvider(JSONParser.MODE_RFC4627)).build();
|
||||||
|
|
||||||
|
private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider();
|
||||||
|
|
||||||
public static final Validator JSON_PATH_VALIDATOR = new Validator() {
|
public static final Validator JSON_PATH_VALIDATOR = new Validator() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -57,35 +60,23 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
||||||
} catch (InvalidPathException ipe) {
|
} catch (InvalidPathException ipe) {
|
||||||
error = ipe.toString();
|
error = ipe.toString();
|
||||||
}
|
}
|
||||||
return new ValidationResult.Builder().subject("JsonPath expression " + subject).valid(error == null).explanation(error).build();
|
return new ValidationResult.Builder().subject(subject).valid(error == null).explanation(error).build();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
|
static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
|
||||||
|
// Parse the document once into an associated context to support multiple path evaluations if specified
|
||||||
final BooleanHolder validJsonHolder = new BooleanHolder(false);
|
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
|
||||||
processSession.read(flowFile, new InputStreamCallback() {
|
processSession.read(flowFile, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(InputStream in) throws IOException {
|
public void process(InputStream in) throws IOException {
|
||||||
validJsonHolder.set(JsonUtils.isValidJson(in));
|
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) {
|
||||||
|
DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(bufferedInputStream);
|
||||||
|
contextHolder.set(ctx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Parse the document once into an associated context to support multiple path evaluations if specified
|
|
||||||
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
|
|
||||||
|
|
||||||
if (validJsonHolder.get()) {
|
|
||||||
processSession.read(flowFile, new InputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(InputStream in) throws IOException {
|
|
||||||
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) {
|
|
||||||
DocumentContext ctx = JsonPath.parse(in);
|
|
||||||
contextHolder.set(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return contextHolder.get();
|
return contextHolder.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,4 +98,5 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
return JSON_PROVIDER.toJson(jsonPathResult);
|
return JSON_PROVIDER.toJson(jsonPathResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
|
import com.jayway.jsonpath.InvalidJsonException;
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import com.jayway.jsonpath.PathNotFoundException;
|
import com.jayway.jsonpath.PathNotFoundException;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
@ -177,9 +178,10 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
|
returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
|
||||||
}
|
}
|
||||||
|
|
||||||
final DocumentContext documentContext = validateAndEstablishJsonContext(processSession, flowFile);
|
DocumentContext documentContext = null;
|
||||||
|
try {
|
||||||
if (documentContext == null) {
|
documentContext = validateAndEstablishJsonContext(processSession, flowFile);
|
||||||
|
} catch (InvalidJsonException e) {
|
||||||
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
|
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
|
||||||
processSession.transfer(flowFile, REL_FAILURE);
|
processSession.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
|
@ -187,7 +189,6 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
|
|
||||||
final Map<String, String> jsonPathResults = new HashMap<>();
|
final Map<String, String> jsonPathResults = new HashMap<>();
|
||||||
|
|
||||||
jsonPathEvalLoop:
|
|
||||||
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
|
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
|
||||||
|
|
||||||
String jsonPathAttrKey = attributeJsonPathEntry.getKey();
|
String jsonPathAttrKey = attributeJsonPathEntry.getKey();
|
||||||
|
@ -207,7 +208,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
|
logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
|
||||||
if (destination.equals(DESTINATION_ATTRIBUTE)) {
|
if (destination.equals(DESTINATION_ATTRIBUTE)) {
|
||||||
jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
|
jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
|
||||||
continue jsonPathEvalLoop;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
processSession.transfer(flowFile, REL_NO_MATCH);
|
processSession.transfer(flowFile, REL_NO_MATCH);
|
||||||
return;
|
return;
|
||||||
|
@ -235,6 +236,4 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
|
flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
|
||||||
processSession.transfer(flowFile, REL_MATCH);
|
processSession.transfer(flowFile, REL_MATCH);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
|
import com.jayway.jsonpath.InvalidJsonException;
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import com.jayway.jsonpath.PathNotFoundException;
|
import com.jayway.jsonpath.PathNotFoundException;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
@ -94,10 +95,10 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
||||||
|
|
||||||
final ProcessorLog logger = getLogger();
|
final ProcessorLog logger = getLogger();
|
||||||
|
|
||||||
|
DocumentContext documentContext = null;
|
||||||
final DocumentContext documentContext = validateAndEstablishJsonContext(processSession, original);
|
try {
|
||||||
|
documentContext = validateAndEstablishJsonContext(processSession, original);
|
||||||
if (documentContext == null) {
|
} catch (InvalidJsonException e) {
|
||||||
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
|
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
|
||||||
processSession.transfer(original, REL_FAILURE);
|
processSession.transfer(original, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Reference in New Issue