NIFI-2632: Added fragment attributes to SplitJson and SplitXml

Signed-off-by: Yolanda M. Davis <ymdavis@apache.org>

This closes #919
This commit is contained in:
Matt Burgess 2016-08-23 09:28:49 -04:00 committed by Yolanda M. Davis
parent 5d1a4f343f
commit ee9bd94082
4 changed files with 105 additions and 48 deletions

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -25,6 +23,8 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -33,18 +33,20 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.DocumentContext;
@ -61,6 +63,16 @@ import com.jayway.jsonpath.PathNotFoundException;
+ "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split,' " + "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split,' "
+ "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or " + "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or "
+ "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.") + "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.")
@WritesAttributes({
@WritesAttribute(attribute = "fragment.identifier",
description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index",
description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count",
description = "The number of split FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")
})
public class SplitJson extends AbstractJsonPathProcessor { public class SplitJson extends AbstractJsonPathProcessor {
public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder() public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder()
@ -184,20 +196,29 @@ public class SplitJson extends AbstractJsonPathProcessor {
} }
List resultList = (List) jsonPathResult; List resultList = (List) jsonPathResult;
AtomicInteger jsonLineCount = new AtomicInteger(0);
for (final Object resultSegment : resultList) { final String fragmentIdentifier = UUID.randomUUID().toString();
for (int i = 0; i < resultList.size(); i++) {
Object resultSegment = resultList.get(i);
FlowFile split = processSession.create(original); FlowFile split = processSession.create(original);
split = processSession.write(split, new OutputStreamCallback() { split = processSession.write(split, (out) -> {
@Override
public void process(OutputStream out) throws IOException {
String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue); String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue);
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
} }
}); );
split = processSession.putAttribute(split, "fragment.identifier", fragmentIdentifier);
split = processSession.putAttribute(split, "fragment.index", Integer.toString(i));
split = processSession.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
segments.add(split); segments.add(split);
jsonLineCount.incrementAndGet();
} }
processSession.transfer(segments, REL_SPLIT); segments.forEach((segment) -> {
segment = processSession.putAttribute(segment, "fragment.count", Integer.toString(jsonLineCount.get()));
processSession.transfer(segment, REL_SPLIT);
});
processSession.transfer(original, REL_ORIGINAL); processSession.transfer(original, REL_ORIGINAL);
logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()}); logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()});
} }

View File

@ -16,9 +16,7 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -27,7 +25,9 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParser;
@ -39,18 +39,19 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.XmlElementNotifier; import org.apache.nifi.processors.standard.util.XmlElementNotifier;
import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedInputStream;
@ -69,6 +70,15 @@ import org.xml.sax.XMLReader;
@Tags({"xml", "split"}) @Tags({"xml", "split"})
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Splits an XML File into multiple separate FlowFiles, each comprising a child or descendant of the original root element") @CapabilityDescription("Splits an XML File into multiple separate FlowFiles, each comprising a child or descendant of the original root element")
@WritesAttributes({
@WritesAttribute(attribute = "fragment.identifier",
description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index",
description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count",
description = "The number of split FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")
})
public class SplitXml extends AbstractProcessor { public class SplitXml extends AbstractProcessor {
public static final PropertyDescriptor SPLIT_DEPTH = new PropertyDescriptor.Builder() public static final PropertyDescriptor SPLIT_DEPTH = new PropertyDescriptor.Builder()
@ -146,24 +156,19 @@ public class SplitXml extends AbstractProcessor {
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
final List<FlowFile> splits = new ArrayList<>(); final List<FlowFile> splits = new ArrayList<>();
final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(new XmlElementNotifier() { final String fragmentIdentifier = UUID.randomUUID().toString();
@Override final AtomicInteger numberOfRecords = new AtomicInteger(0);
public void onXmlElementFound(final String xmlTree) { final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> {
FlowFile split = session.create(original); FlowFile split = session.create(original);
split = session.write(split, new OutputStreamCallback() { split = session.write(split, out -> out.write(xmlTree.getBytes("UTF-8")));
@Override split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
public void process(final OutputStream out) throws IOException { split = session.putAttribute(split, "fragment.index", Integer.toString(numberOfRecords.getAndIncrement()));
out.write(xmlTree.getBytes("UTF-8")); split = session.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
}
});
splits.add(split); splits.add(split);
}
}, depth); }, depth);
final AtomicBoolean failed = new AtomicBoolean(false); final AtomicBoolean failed = new AtomicBoolean(false);
session.read(original, new InputStreamCallback() { session.read(original, rawIn -> {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) { try (final InputStream in = new BufferedInputStream(rawIn)) {
SAXParser saxParser = null; SAXParser saxParser = null;
try { try {
@ -176,14 +181,17 @@ public class SplitXml extends AbstractProcessor {
failed.set(true); failed.set(true);
} }
} }
}
}); });
if (failed.get()) { if (failed.get()) {
session.transfer(original, REL_FAILURE); session.transfer(original, REL_FAILURE);
session.remove(splits); session.remove(splits);
} else { } else {
session.transfer(splits, REL_SPLIT); splits.forEach((split) -> {
split = session.putAttribute(split, "fragment.count", Integer.toString(numberOfRecords.get()));
session.transfer(split, REL_SPLIT);
});
session.transfer(original, REL_ORIGINAL); session.transfer(original, REL_ORIGINAL);
logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()}); logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()});
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
@ -32,6 +33,7 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.HashMap;
public class TestSplitJson { public class TestSplitJson {
@ -110,13 +112,26 @@ public class TestSplitJson {
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[*].name"); testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[*].name");
testRunner.enqueue(JSON_SNIPPET); testRunner.enqueue(JSON_SNIPPET, new HashMap<String, String>() {
{
put(CoreAttributes.FILENAME.key(), "test.json");
}
});
testRunner.run(); testRunner.run();
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1); testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7); testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7);
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET); testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}"); MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0);
flowFile.assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}");
flowFile.assertAttributeEquals("fragment.count", "7");
flowFile.assertAttributeEquals("fragment.index", "0");
flowFile.assertAttributeEquals("segment.original.filename", "test.json");
flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(6);
flowFile.assertAttributeEquals("fragment.count", "7");
flowFile.assertAttributeEquals("fragment.index", "6");
flowFile.assertAttributeEquals("segment.original.filename", "test.json");
} }
@Test @Test

View File

@ -19,11 +19,14 @@ package org.apache.nifi.processors.standard;
import java.io.IOException; import java.io.IOException;
import java.io.StringReader; import java.io.StringReader;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory; import javax.xml.parsers.SAXParserFactory;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -57,13 +60,23 @@ public class TestSplitXml {
@Test @Test
public void testDepthOf1() throws Exception { public void testDepthOf1() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new SplitXml()); final TestRunner runner = TestRunners.newTestRunner(new SplitXml());
runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1")); runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"), new HashMap<String, String>() {
{
put(CoreAttributes.FILENAME.key(), "test.xml");
}
});
runner.run(); runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1); runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitXml.REL_SPLIT, 6); runner.assertTransferCount(SplitXml.REL_SPLIT, 6);
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL)); parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT)); parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT));
Arrays.asList(0, 1, 2, 3, 4, 5).forEach((index) -> {
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT).get(index);
flowFile.assertAttributeEquals("fragment.index", Integer.toString(index));
flowFile.assertAttributeEquals("fragment.count", "6");
flowFile.assertAttributeEquals("segment.original.filename", "test.xml");
});
} }
@Test @Test
@ -82,7 +95,7 @@ public class TestSplitXml {
@Test @Test
public void testDepthOf3() throws Exception { public void testDepthOf3() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new SplitXml()); final TestRunner runner = TestRunners.newTestRunner(new SplitXml());
runner.setProperty(SplitXml.SPLIT_DEPTH, "2"); runner.setProperty(SplitXml.SPLIT_DEPTH, "3");
runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1")); runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"));
runner.run(); runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1); runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
@ -98,7 +111,7 @@ public class TestSplitXml {
// declarations are handled correctly. // declarations are handled correctly.
factory = SAXParserFactory.newInstance(); factory = SAXParserFactory.newInstance();
factory.setNamespaceAware(true); factory.setNamespaceAware(true);
saxParser = factory.newSAXParser( ); saxParser = factory.newSAXParser();
final TestRunner runner = TestRunners.newTestRunner(new SplitXml()); final TestRunner runner = TestRunners.newTestRunner(new SplitXml());
runner.setProperty(SplitXml.SPLIT_DEPTH, "3"); runner.setProperty(SplitXml.SPLIT_DEPTH, "3");