Refactoring common JSON/JsonPath functionality into JsonUtils

This commit is contained in:
Aldrin Piri 2015-02-17 10:26:18 -05:00
parent 5a81f19b25
commit 59ad194851
3 changed files with 111 additions and 89 deletions

View File

@ -16,12 +16,9 @@
*/
package org.apache.nifi.processors.standard;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import com.jayway.jsonpath.spi.json.JsonProvider;
import net.minidev.json.JSONValue;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -34,17 +31,12 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.JsonPathUtils;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.processors.standard.util.JsonUtils;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.BooleanHolder;
import org.apache.nifi.util.ObjectHolder;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
@ -95,8 +87,6 @@ public class EvaluateJsonPath extends AbstractProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
@Override
protected void init(final ProcessorInitializationContext context) {
@ -150,7 +140,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.expressionLanguageSupported(false)
.addValidator(JsonPathUtils.JSON_PATH_VALIDATOR)
.addValidator(JsonUtils.JSON_PATH_VALIDATOR)
.required(false)
.dynamic(true)
.build();
@ -182,42 +172,15 @@ public class EvaluateJsonPath extends AbstractProcessor {
flowFileLoop:
for (FlowFile flowFile : flowFiles) {
// Validate the JSON document before attempting processing
final BooleanHolder validJsonHolder = new BooleanHolder(false);
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (InputStreamReader inputStreamReader = new InputStreamReader(in)) {
/*
* JSONValue#isValidJson is permissive to the degree of the Smart JSON definition.
* Accordingly, a strict JSON approach is preferred in determining whether or not a document is valid.
*/
boolean validJson = JSONValue.isValidJsonStrict(inputStreamReader);
validJsonHolder.set(validJson);
}
}
});
if (!validJsonHolder.get()) {
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile.getId()});
final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, flowFile);
if (documentContext == null) {
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
processSession.transfer(flowFile, REL_FAILURE);
continue flowFileLoop;
}
// Parse the document once into an associated context to support multiple path evaluations if specified
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) {
DocumentContext ctx = JsonPath.parse(in);
contextHolder.set(ctx);
}
}
});
final DocumentContext documentContext = contextHolder.get();
final Map<String, String> jsonPathResults = new HashMap<>();
jsonPathEvalLoop:
@ -226,7 +189,6 @@ public class EvaluateJsonPath extends AbstractProcessor {
String jsonPathAttrKey = attributeJsonPathEntry.getKey();
JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
try {
Object result = documentContext.read(jsonPathExp);
@ -274,7 +236,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
if (isScalar(jsonPathResult)) {
return jsonPathResult.toString();
}
return JSON_PROVIDER.toJson(jsonPathResult);
return JsonUtils.JSON_PROVIDER.toJson(jsonPathResult);
}
private static boolean isScalar(Object obj) {

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
/**
* Provides utilities for interacting with JsonPath expressions and results
*
* @see <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a>
*/
public class JsonPathUtils {
public static final Validator JSON_PATH_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
String error = null;
try {
JsonPath compile = JsonPath.compile(input);
} catch (InvalidPathException ipe) {
error = ipe.toString();
}
return new ValidationResult.Builder().valid(error == null).explanation(error).build();
}
};
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.spi.json.JsonProvider;
import net.minidev.json.JSONValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.BooleanHolder;
import org.apache.nifi.util.ObjectHolder;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* Provides utilities for interacting with JSON elements and JsonPath expressions and results
*
* @see <a href="http://json.org">http://json.org</a>
* @see <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a>
*/
public class JsonUtils {
public static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
public static final Validator JSON_PATH_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
String error = null;
try {
JsonPath compile = JsonPath.compile(input);
} catch (InvalidPathException ipe) {
error = ipe.toString();
}
return new ValidationResult.Builder().valid(error == null).explanation(error).build();
}
};
public static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
final BooleanHolder validJsonHolder = new BooleanHolder(false);
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
validJsonHolder.set(JsonUtils.isValidJson(in));
}
});
// Parse the document once into an associated context to support multiple path evaluations if specified
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
if (validJsonHolder.get()) {
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) {
DocumentContext ctx = JsonPath.parse(in);
contextHolder.set(ctx);
}
}
});
}
return contextHolder.get();
}
public static boolean isValidJson(InputStream inputStream) throws IOException {
boolean isValid = false;
/*
* JSONValue#isValidJson is permissive to the degree of the Smart JSON definition.
* Accordingly, a strict JSON approach is preferred in determining whether or not a document is valid.
*/
try (InputStreamReader inputStreamReader = new InputStreamReader(inputStream)) {
isValid = JSONValue.isValidJsonStrict(inputStreamReader);
}
return isValid;
}
}