mirror of https://github.com/apache/nifi.git
Providing a SplitJson processor which will break JSON Arrays into their individual elements. Refactored supporting JsonUtils code and EvaluateJsonPath to reuse common functionality.
This commit is contained in:
parent
59ad194851
commit
2e05dcbbfd
|
@ -192,7 +192,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
|
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
|
||||||
try {
|
try {
|
||||||
Object result = documentContext.read(jsonPathExp);
|
Object result = documentContext.read(jsonPathExp);
|
||||||
if (returnType.equals(RETURN_TYPE_SCALAR) && !isScalar(result)) {
|
if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) {
|
||||||
logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
|
logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
|
||||||
new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
|
new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
|
||||||
processSession.transfer(flowFile, REL_FAILURE);
|
processSession.transfer(flowFile, REL_FAILURE);
|
||||||
|
@ -233,15 +233,11 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getResultRepresentation(Object jsonPathResult) {
|
private static String getResultRepresentation(Object jsonPathResult) {
|
||||||
if (isScalar(jsonPathResult)) {
|
if (JsonUtils.isJsonScalar(jsonPathResult)) {
|
||||||
return jsonPathResult.toString();
|
return jsonPathResult.toString();
|
||||||
}
|
}
|
||||||
return JsonUtils.JSON_PROVIDER.toJson(jsonPathResult);
|
return JsonUtils.JSON_PROVIDER.toJson(jsonPathResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isScalar(Object obj) {
|
|
||||||
return (obj instanceof String);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,140 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
|
import com.jayway.jsonpath.JsonPath;
|
||||||
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.logging.ProcessorLog;
|
||||||
|
import org.apache.nifi.processor.*;
|
||||||
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
|
import org.apache.nifi.processors.standard.util.JsonUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
@EventDriven
|
||||||
|
@SideEffectFree
|
||||||
|
@SupportsBatching
|
||||||
|
@Tags({"json", "split", "jsonpath"})
|
||||||
|
@CapabilityDescription("Splits a JSON File into multiple, separate FlowFiles for an array element specified by a JsonPath expression. "
|
||||||
|
+ "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split, "
|
||||||
|
+ "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or "
|
||||||
|
+ "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.")
|
||||||
|
public class SplitJson extends AbstractProcessor {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder()
|
||||||
|
.name("JsonPath Expression")
|
||||||
|
.description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(JsonUtils.JSON_PATH_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build();
|
||||||
|
public static final Relationship REL_SPLIT = new Relationship.Builder().name("split").description("All segments of the original FlowFile will be routed to this relationship").build();
|
||||||
|
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON or the specified path does not exist), it will be routed to this relationship").build();
|
||||||
|
|
||||||
|
private List<PropertyDescriptor> properties;
|
||||||
|
private Set<Relationship> relationships;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
|
properties.add(ARRAY_JSON_PATH_EXPRESSION);
|
||||||
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
|
|
||||||
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_ORIGINAL);
|
||||||
|
relationships.add(REL_SPLIT);
|
||||||
|
relationships.add(REL_FAILURE);
|
||||||
|
this.relationships = Collections.unmodifiableSet(relationships);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
|
||||||
|
final FlowFile original = processSession.get();
|
||||||
|
if (original == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ProcessorLog logger = getLogger();
|
||||||
|
|
||||||
|
|
||||||
|
final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, original);
|
||||||
|
|
||||||
|
if (documentContext == null) {
|
||||||
|
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
|
||||||
|
processSession.transfer(original, REL_FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
|
||||||
|
final JsonPath jsonPath = JsonPath.compile(jsonPathExpression);
|
||||||
|
|
||||||
|
final List<FlowFile> segments = new ArrayList<>();
|
||||||
|
|
||||||
|
Object jsonPathResult = documentContext.read(jsonPath);
|
||||||
|
|
||||||
|
if (!(jsonPathResult instanceof List)) {
|
||||||
|
logger.error("The evaluated value {} of {} was not an array compatible type and cannot be split.",
|
||||||
|
new Object[]{jsonPathResult, jsonPath.getPath()});
|
||||||
|
processSession.transfer(original, REL_FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
List resultList = (List) jsonPathResult;
|
||||||
|
|
||||||
|
for (final Object resultSegment : resultList) {
|
||||||
|
FlowFile split = processSession.create(original);
|
||||||
|
split = processSession.write(split, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(OutputStream out) throws IOException {
|
||||||
|
String resultSegmentContent;
|
||||||
|
if (JsonUtils.isJsonScalar(resultSegment)) {
|
||||||
|
resultSegmentContent = resultSegment.toString();
|
||||||
|
} else {
|
||||||
|
resultSegmentContent = JsonUtils.JSON_PROVIDER.toJson(resultSegment);
|
||||||
|
}
|
||||||
|
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
segments.add(split);
|
||||||
|
}
|
||||||
|
|
||||||
|
processSession.transfer(segments, REL_SPLIT);
|
||||||
|
processSession.transfer(original, REL_ORIGINAL);
|
||||||
|
logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()});
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,6 +35,8 @@ import org.apache.nifi.util.ObjectHolder;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides utilities for interacting with JSON elements and JsonPath expressions and results
|
* Provides utilities for interacting with JSON elements and JsonPath expressions and results
|
||||||
|
@ -101,4 +103,10 @@ public class JsonUtils {
|
||||||
|
|
||||||
return isValid;
|
return isValid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isJsonScalar(Object obj) {
|
||||||
|
// For the default provider, a Map or List is able to be handled as a JSON entity
|
||||||
|
return !(obj instanceof Map || obj instanceof List);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,7 @@ org.apache.nifi.processors.standard.ScanAttribute
|
||||||
org.apache.nifi.processors.standard.ScanContent
|
org.apache.nifi.processors.standard.ScanContent
|
||||||
org.apache.nifi.processors.standard.SegmentContent
|
org.apache.nifi.processors.standard.SegmentContent
|
||||||
org.apache.nifi.processors.standard.SplitContent
|
org.apache.nifi.processors.standard.SplitContent
|
||||||
|
org.apache.nifi.processors.standard.SplitJson
|
||||||
org.apache.nifi.processors.standard.SplitText
|
org.apache.nifi.processors.standard.SplitText
|
||||||
org.apache.nifi.processors.standard.SplitXml
|
org.apache.nifi.processors.standard.SplitXml
|
||||||
org.apache.nifi.processors.standard.TransformXml
|
org.apache.nifi.processors.standard.TransformXml
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.StringUtils;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -158,7 +159,7 @@ public class TestEvaluateJsonPath {
|
||||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
|
final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
|
||||||
Assert.assertEquals("Transferred flow file did not have the correct result for id attribute", "54df94072d5dbf7dc6340cc5", out.getAttribute(jsonPathIdAttrKey));
|
Assert.assertEquals("Transferred flow file did not have the correct result for id attribute", "54df94072d5dbf7dc6340cc5", out.getAttribute(jsonPathIdAttrKey));
|
||||||
Assert.assertEquals("Transferred flow file did not have the correct result for name attribute", "", out.getAttribute(jsonPathNameAttrKey));
|
Assert.assertEquals("Transferred flow file did not have the correct result for name attribute", StringUtils.EMPTY, out.getAttribute(jsonPathNameAttrKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -0,0 +1,115 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
|
public class TestSplitJson {
|
||||||
|
|
||||||
|
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
|
||||||
|
private static final Path XML_SNIPPET = Paths.get("src/test/resources/TestXml/xml-snippet.xml");
|
||||||
|
|
||||||
|
@Test(expected = AssertionError.class)
|
||||||
|
public void testInvalidJsonPath() {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||||
|
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$..");
|
||||||
|
|
||||||
|
Assert.fail("An improper JsonPath expression was not detected as being invalid.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidJsonDocument() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||||
|
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$");
|
||||||
|
|
||||||
|
testRunner.enqueue(XML_SNIPPET);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertAllFlowFilesTransferred(SplitJson.REL_FAILURE, 1);
|
||||||
|
final MockFlowFile out = testRunner.getFlowFilesForRelationship(SplitJson.REL_FAILURE).get(0);
|
||||||
|
// Verify that the content was unchanged
|
||||||
|
out.assertContentEquals(XML_SNIPPET);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplit_nonArrayResult() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||||
|
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0]._id");
|
||||||
|
|
||||||
|
testRunner.enqueue(JSON_SNIPPET);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
Relationship expectedRel = SplitJson.REL_FAILURE;
|
||||||
|
|
||||||
|
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||||
|
final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
|
||||||
|
out.assertContentEquals(JSON_SNIPPET);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplit_arrayResult_oneValue() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||||
|
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0].range[?(@ == 0)]");
|
||||||
|
|
||||||
|
testRunner.enqueue(JSON_SNIPPET);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
|
||||||
|
testRunner.assertTransferCount(SplitJson.REL_SPLIT, 1);
|
||||||
|
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
|
||||||
|
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("0");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplit_arrayResult_multipleValues() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||||
|
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0].range");
|
||||||
|
|
||||||
|
testRunner.enqueue(JSON_SNIPPET);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
int numSplitsExpected = 10;
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
|
||||||
|
testRunner.assertTransferCount(SplitJson.REL_SPLIT, numSplitsExpected);
|
||||||
|
final MockFlowFile originalOut = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0);
|
||||||
|
originalOut.assertContentEquals(JSON_SNIPPET);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplit_arrayResult_nonScalarValues() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||||
|
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[*].name");
|
||||||
|
|
||||||
|
testRunner.enqueue(JSON_SNIPPET);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
|
||||||
|
testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7);
|
||||||
|
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
|
||||||
|
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue