diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index 58b3adc9f4..caa5a0bb27 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -25,6 +23,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; @@ -33,18 +33,20 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import com.jayway.jsonpath.DocumentContext; @@ -61,6 +63,16 @@ import com.jayway.jsonpath.PathNotFoundException; + "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split,' " + "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or " + "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.") +@WritesAttributes({ + @WritesAttribute(attribute = "fragment.identifier", + description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), + @WritesAttribute(attribute = "fragment.index", + description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", + description = "The number of split FlowFiles generated from the parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile") +}) + public class SplitJson extends AbstractJsonPathProcessor { public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder() @@ -184,20 +196,29 @@ public class SplitJson extends AbstractJsonPathProcessor { } List resultList = (List) jsonPathResult; + AtomicInteger jsonLineCount = new AtomicInteger(0); - for (final Object resultSegment : resultList) { + final String fragmentIdentifier = UUID.randomUUID().toString(); + for (int i = 0; i < resultList.size(); i++) { + Object resultSegment = resultList.get(i); FlowFile split = processSession.create(original); - split = processSession.write(split, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue); - out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); - } - }); + split = processSession.write(split, (out) -> { + String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue); + out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); + } + ); + split = processSession.putAttribute(split, "fragment.identifier", fragmentIdentifier); + split = processSession.putAttribute(split, "fragment.index", Integer.toString(i)); + split = processSession.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key())); segments.add(split); + jsonLineCount.incrementAndGet(); } - processSession.transfer(segments, REL_SPLIT); + segments.forEach((segment) -> { + segment = processSession.putAttribute(segment, "fragment.count", Integer.toString(jsonLineCount.get())); + processSession.transfer(segment, REL_SPLIT); + }); + processSession.transfer(original, REL_ORIGINAL); logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()}); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java index 4764ea8891..0f0032a9ca 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java @@ -16,9 +16,7 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -27,7 +25,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.SAXParser; @@ -39,18 +39,19 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.XmlElementNotifier; import org.apache.nifi.stream.io.BufferedInputStream; @@ -69,6 +70,15 @@ import org.xml.sax.XMLReader; @Tags({"xml", "split"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Splits an XML File into multiple separate FlowFiles, each comprising a child or descendant of the original root element") +@WritesAttributes({ + @WritesAttribute(attribute = "fragment.identifier", + description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), + @WritesAttribute(attribute = "fragment.index", + description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", + description = "The number of split FlowFiles generated from the parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile") +}) public class SplitXml extends AbstractProcessor { public static final PropertyDescriptor SPLIT_DEPTH = new PropertyDescriptor.Builder() @@ -146,35 +156,29 @@ public class SplitXml extends AbstractProcessor { final ComponentLog logger = getLogger(); final List splits = new ArrayList<>(); - final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(new XmlElementNotifier() { - @Override - public void onXmlElementFound(final String xmlTree) { - FlowFile split = session.create(original); - split = session.write(split, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(xmlTree.getBytes("UTF-8")); - } - }); - splits.add(split); - } + final String fragmentIdentifier = UUID.randomUUID().toString(); + final AtomicInteger numberOfRecords = new AtomicInteger(0); + final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> { + FlowFile split = session.create(original); + split = session.write(split, out -> out.write(xmlTree.getBytes("UTF-8"))); + split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier); + split = session.putAttribute(split, "fragment.index", Integer.toString(numberOfRecords.getAndIncrement())); + split = session.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key())); + splits.add(split); }, depth); final AtomicBoolean failed = new AtomicBoolean(false); - session.read(original, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - SAXParser saxParser = null; - try { - saxParser = saxParserFactory.newSAXParser(); - final XMLReader reader = saxParser.getXMLReader(); - reader.setContentHandler(parser); - reader.parse(new InputSource(in)); - } catch (final ParserConfigurationException | SAXException e) { - logger.error("Unable to parse {} due to {}", new Object[]{original, e}); - failed.set(true); - } + session.read(original, rawIn -> { + try (final InputStream in = new BufferedInputStream(rawIn)) { + SAXParser saxParser = null; + try { + saxParser = saxParserFactory.newSAXParser(); + final XMLReader reader = saxParser.getXMLReader(); + reader.setContentHandler(parser); + reader.parse(new InputSource(in)); + } catch (final ParserConfigurationException | SAXException e) { + logger.error("Unable to parse {} due to {}", new Object[]{original, e}); + failed.set(true); } } }); @@ -183,7 +187,11 @@ public class SplitXml extends AbstractProcessor { session.transfer(original, REL_FAILURE); session.remove(splits); } else { - session.transfer(splits, REL_SPLIT); + splits.forEach((split) -> { + split = session.putAttribute(split, "fragment.count", Integer.toString(numberOfRecords.get())); + session.transfer(split, REL_SPLIT); + }); + session.transfer(original, REL_ORIGINAL); logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()}); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java index fc07386415..4e0d99927f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.OutputStreamCallback; @@ -32,6 +33,7 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; public class TestSplitJson { @@ -110,13 +112,26 @@ public class TestSplitJson { final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[*].name"); - testRunner.enqueue(JSON_SNIPPET); + testRunner.enqueue(JSON_SNIPPET, new HashMap() { + { + put(CoreAttributes.FILENAME.key(), "test.json"); + } + }); testRunner.run(); testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1); testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7); testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET); - testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}"); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0); + flowFile.assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}"); + flowFile.assertAttributeEquals("fragment.count", "7"); + flowFile.assertAttributeEquals("fragment.index", "0"); + flowFile.assertAttributeEquals("segment.original.filename", "test.json"); + + flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(6); + flowFile.assertAttributeEquals("fragment.count", "7"); + flowFile.assertAttributeEquals("fragment.index", "6"); + flowFile.assertAttributeEquals("segment.original.filename", "test.json"); } @Test diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java index 1815b3b6b2..2157ab853a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java @@ -19,11 +19,14 @@ package org.apache.nifi.processors.standard; import java.io.IOException; import java.io.StringReader; import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -57,13 +60,23 @@ public class TestSplitXml { @Test public void testDepthOf1() throws Exception { final TestRunner runner = TestRunners.newTestRunner(new SplitXml()); - runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1")); + runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"), new HashMap() { + { + put(CoreAttributes.FILENAME.key(), "test.xml"); + } + }); runner.run(); runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1); runner.assertTransferCount(SplitXml.REL_SPLIT, 6); parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL)); parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT)); + Arrays.asList(0, 1, 2, 3, 4, 5).forEach((index) -> { + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT).get(index); + flowFile.assertAttributeEquals("fragment.index", Integer.toString(index)); + flowFile.assertAttributeEquals("fragment.count", "6"); + flowFile.assertAttributeEquals("segment.original.filename", "test.xml"); + }); } @Test @@ -82,7 +95,7 @@ public class TestSplitXml { @Test public void testDepthOf3() throws Exception { final TestRunner runner = TestRunners.newTestRunner(new SplitXml()); - runner.setProperty(SplitXml.SPLIT_DEPTH, "2"); + runner.setProperty(SplitXml.SPLIT_DEPTH, "3"); runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1")); runner.run(); runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1); @@ -98,7 +111,7 @@ public class TestSplitXml { // declarations are handled correctly. factory = SAXParserFactory.newInstance(); factory.setNamespaceAware(true); - saxParser = factory.newSAXParser( ); + saxParser = factory.newSAXParser(); final TestRunner runner = TestRunners.newTestRunner(new SplitXml()); runner.setProperty(SplitXml.SPLIT_DEPTH, "3");