nifi-476: Addressing the handling of null values in JsonPath related processors and providing configuration to treat them as empty string or the string "null".

Signed-off-by: Aldrin Piri <aldrinpiri@gmail.com>
This commit is contained in:
Aldrin Piri 2015-04-17 23:30:26 -04:00
parent c5b961be5b
commit fb2206a9bb
5 changed files with 217 additions and 23 deletions

View File

@ -22,6 +22,7 @@ import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.internal.spi.json.JsonSmartJsonProvider;
import com.jayway.jsonpath.spi.json.JsonProvider;
import net.minidev.json.parser.JSONParser;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
@ -35,8 +36,10 @@ import org.apache.nifi.util.ObjectHolder;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Provides common functionality used for processors interacting and manipulating JSON data via JsonPath.
@ -51,6 +54,24 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider();
static final Map<String, String> NULL_REPRESENTATION_MAP = new HashMap<>();
static final String EMPTY_STRING_OPTION = "empty string";
static final String NULL_STRING_OPTION = "the string 'null'";
static {
NULL_REPRESENTATION_MAP.put(EMPTY_STRING_OPTION, "");
NULL_REPRESENTATION_MAP.put(NULL_STRING_OPTION, "null");
}
public static final PropertyDescriptor NULL_VALUE_DEFAULT_REPRESENTATION = new PropertyDescriptor.Builder()
.name("Null Value Representation")
.description("Indicates the desired representation of JSON Path expressions resulting in a null value.")
.required(true)
.allowableValues(NULL_REPRESENTATION_MAP.keySet())
.defaultValue(EMPTY_STRING_OPTION)
.build();
static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
// Parse the document once into an associated context to support multiple path evaluations if specified
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
@ -79,9 +100,9 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
return !(obj instanceof Map || obj instanceof List);
}
static String getResultRepresentation(Object jsonPathResult) {
static String getResultRepresentation(Object jsonPathResult, String defaultValue) {
if (isJsonScalar(jsonPathResult)) {
return jsonPathResult.toString();
return Objects.toString(jsonPathResult, defaultValue);
}
return JSON_PROVIDER.toJson(jsonPathResult);
}

View File

@ -16,20 +16,10 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
@ -52,10 +42,12 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@EventDriven
@SideEffectFree
@ -72,7 +64,7 @@ import com.jayway.jsonpath.PathNotFoundException;
+ "If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be routed to 'unmatched' without having its contents modified. "
+ "If Destination is flowfile-attribute and the expression matches nothing, attributes will be created with "
+ "empty strings as the value, and the FlowFile will always be routed to 'matched.'")
@DynamicProperty(name="A FlowFile attribute(if <Destination> is set to 'flowfile-attribute')", value="A JsonPath expression", description="If <Destination>='flowfile-attribute' then that FlowFile attribute " +
@DynamicProperty(name = "A FlowFile attribute(if <Destination> is set to 'flowfile-attribute')", value = "A JsonPath expression", description = "If <Destination>='flowfile-attribute' then that FlowFile attribute " +
"will be set to any JSON objects that match the JsonPath. If <Destination>='flowfile-content' then the FlowFile content will be updated to any JSON objects that match the JsonPath.")
public class EvaluateJsonPath extends AbstractJsonPathProcessor {
@ -119,6 +111,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DESTINATION);
properties.add(RETURN_TYPE);
properties.add(NULL_VALUE_DEFAULT_REPRESENTATION);
this.properties = Collections.unmodifiableList(properties);
}
@ -211,6 +204,9 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
final ProcessorLog logger = getLogger();
String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
/* Build the JsonPath expressions from attributes */
final Map<String, JsonPath> attributeToJsonPathMap = new HashMap<>();
@ -265,7 +261,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
}
}
final String resultRepresentation = getResultRepresentation(resultHolder.get());
final String resultRepresentation = getResultRepresentation(resultHolder.get(), nullDefaultValue);
switch (destination) {
case DESTINATION_ATTRIBUTE:
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);

View File

@ -74,6 +74,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ARRAY_JSON_PATH_EXPRESSION);
properties.add(NULL_VALUE_DEFAULT_REPRESENTATION);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
@ -142,6 +143,8 @@ public class SplitJson extends AbstractJsonPathProcessor {
}
final JsonPath jsonPath = JSON_PATH_REF.get();
String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
final List<FlowFile> segments = new ArrayList<>();
@ -168,7 +171,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
split = processSession.write(split, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
String resultSegmentContent = getResultRepresentation(resultSegment);
String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue);
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
}
});

View File

@ -16,7 +16,11 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.TestRunner;
@ -24,9 +28,14 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import static org.junit.Assert.assertEquals;
public class TestEvaluateJsonPath {
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
@ -261,4 +270,81 @@ public class TestEvaluateJsonPath {
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals(JSON_SNIPPET);
}
@Test
public void testNullInput() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
testRunner.setProperty(EvaluateJsonPath.RETURN_TYPE, EvaluateJsonPath.RETURN_TYPE_JSON);
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
testRunner.setProperty("stringField", "$.stringField");
testRunner.setProperty("missingField", "$.missingField");
testRunner.setProperty("nullField", "$.nullField");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.write(ff, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write("{\"stringField\": \"String Value\", \"nullField\": null}".getBytes(StandardCharsets.UTF_8));
}
}
});
testRunner.enqueue(ff);
testRunner.run();
testRunner.assertTransferCount(EvaluateJsonPath.REL_MATCH, 1);
FlowFile output = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_MATCH).get(0);
String validFieldValue = output.getAttribute("stringField");
assertEquals("String Value", validFieldValue);
String missingValue = output.getAttribute("missingField");
assertEquals("Missing Value", "", missingValue);
String nullValue = output.getAttribute("nullField");
assertEquals("Null Value", "", nullValue);
}
@Test
public void testNullInput_nullStringRepresentation() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
testRunner.setProperty(EvaluateJsonPath.RETURN_TYPE, EvaluateJsonPath.RETURN_TYPE_JSON);
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
testRunner.setProperty(EvaluateJsonPath.NULL_VALUE_DEFAULT_REPRESENTATION, AbstractJsonPathProcessor.NULL_STRING_OPTION);
testRunner.setProperty("stringField", "$.stringField");
testRunner.setProperty("missingField", "$.missingField");
testRunner.setProperty("nullField", "$.nullField");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.write(ff, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write("{\"stringField\": \"String Value\", \"nullField\": null}".getBytes(StandardCharsets.UTF_8));
}
}
});
testRunner.enqueue(ff);
testRunner.run();
testRunner.assertTransferCount(EvaluateJsonPath.REL_MATCH, 1);
FlowFile output = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_MATCH).get(0);
String validFieldValue = output.getAttribute("stringField");
assertEquals("String Value", validFieldValue);
String missingValue = output.getAttribute("missingField");
assertEquals("Missing Value", "", missingValue);
String nullValue = output.getAttribute("nullField");
assertEquals("Null Value", "null", nullValue);
}
}

View File

@ -16,13 +16,20 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -123,4 +130,85 @@ public class TestSplitJson {
testRunner.assertTransferCount(SplitJson.REL_FAILURE, 1);
testRunner.getFlowFilesForRelationship(SplitJson.REL_FAILURE).get(0).assertContentEquals(JSON_SNIPPET);
}
@Test
public void testSplit_pathToNullValue() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.nullField");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.write(ff, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write("{\"stringField\": \"String Value\", \"nullField\": null}".getBytes(StandardCharsets.UTF_8));
}
}
});
testRunner.enqueue(ff);
testRunner.run();
testRunner.assertTransferCount(SplitJson.REL_FAILURE, 1);
}
@Test
public void testSplit_pathToArrayWithNulls_emptyStringRepresentation() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.arrayOfNulls");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.write(ff, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write("{\"stringField\": \"String Value\", \"arrayOfNulls\": [null, null, null]}".getBytes(StandardCharsets.UTF_8));
}
}
});
testRunner.enqueue(ff);
testRunner.run();
/* assert that three files were transferred to split and each is empty */
int expectedFiles = 3;
testRunner.assertTransferCount(SplitJson.REL_SPLIT, expectedFiles);
for (int i = 0; i < expectedFiles; i++) {
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(i).assertContentEquals("");
}
}
@Test
public void testSplit_pathToArrayWithNulls_nullStringRepresentation() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.arrayOfNulls");
testRunner.setProperty(SplitJson.NULL_VALUE_DEFAULT_REPRESENTATION,
AbstractJsonPathProcessor.NULL_STRING_OPTION);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.write(ff, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write("{\"stringField\": \"String Value\", \"arrayOfNulls\": [null, null, null]}".getBytes(StandardCharsets.UTF_8));
}
}
});
testRunner.enqueue(ff);
testRunner.run();
/* assert that three files were transferred to split and each has the word null in it */
int expectedFiles = 3;
testRunner.assertTransferCount(SplitJson.REL_SPLIT, expectedFiles);
for (int i = 0; i < expectedFiles; i++) {
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(i).assertContentEquals("null");
}
}
}