From 844dbe4edbc1cc0d24abe394cd276b9925627240 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Mon, 17 Jul 2017 09:23:55 -0400 Subject: [PATCH] NIFI-4156: Fixed fragment.count in SplitText to equal emitted flow files Signed-off-by: Pierre Villard This closes #2014. --- .../nifi/processors/standard/SplitText.java | 14 +++++--- .../processors/standard/TestSplitText.java | 33 ++++++++++++++++++- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index 5738632628..95accf475a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@ -298,7 +298,7 @@ public class SplitText extends AbstractProcessor { if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) { FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); splitFlowFile = this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(), - fragmentId, fragmentIndex++, 0, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + fragmentId, fragmentIndex++, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); splitFlowFiles.add(splitFlowFile); } else { for (SplitInfo computedSplitInfo : computedSplitsInfo) { @@ -318,10 +318,14 @@ public class SplitText extends AbstractProcessor { } splitFlowFile = this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++, - computedSplitsInfo.size(), sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); splitFlowFiles.add(splitFlowFile); } } + // Update fragment.count with real split count (i.e. don't count files for which there was no clone) + for (FlowFile splitFlowFile : splitFlowFiles) { + splitFlowFile = processSession.putAttribute(splitFlowFile, FRAGMENT_COUNT, String.valueOf(fragmentIndex - 1)); // -1 because the index starts at 1 (see above) + } } getLogger().info("Split " + sourceFlowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : ".")); @@ -356,13 +360,12 @@ public class SplitText extends AbstractProcessor { } private FlowFile updateAttributes(ProcessSession processSession, FlowFile splitFlowFile, long splitLineCount, long splitFlowFileSize, - String splitId, int splitIndex, int splitCount, String origFileName) { + String splitId, int splitIndex, String origFileName) { Map attributes = new HashMap<>(); attributes.put(SPLIT_LINE_COUNT, String.valueOf(splitLineCount)); attributes.put(FRAGMENT_SIZE, String.valueOf(splitFlowFile.getSize())); attributes.put(FRAGMENT_ID, splitId); attributes.put(FRAGMENT_INDEX, String.valueOf(splitIndex)); - attributes.put(FRAGMENT_COUNT, String.valueOf(splitCount)); attributes.put(SEGMENT_ORIGINAL_FILENAME, origFileName); return processSession.putAllAttributes(splitFlowFile, attributes); } @@ -424,6 +427,7 @@ public class SplitText extends AbstractProcessor { SplitInfo remainderSplitInfo, long startingLength) throws IOException { long length = 0; long trailingCrlfLength = 0; + long trailingLineCount = 0; long actualLineCount = 0; OffsetInfo offsetInfo = null; SplitInfo splitInfo = null; @@ -440,6 +444,7 @@ public class SplitText extends AbstractProcessor { if (offsetInfo.getLength() == offsetInfo.getCrlfLength()) { trailingCrlfLength += offsetInfo.getCrlfLength(); + trailingLineCount++; } else if (offsetInfo.getLength() > offsetInfo.getCrlfLength()) { trailingCrlfLength = 0; // non-empty line came in, thus resetting counter } @@ -465,6 +470,7 @@ public class SplitText extends AbstractProcessor { if (length - trailingCrlfLength >= lastCrlfLength) { trailingCrlfLength += lastCrlfLength; // trim CRLF from the last line } + actualLineCount -= trailingLineCount; splitInfo = new SplitInfo(startOffset, length, length - trailingCrlfLength, actualLineCount, remaningOffsetInfo); } return splitInfo; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java index 8e4c8815d6..f42b1d3c15 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java @@ -771,7 +771,38 @@ public class TestSplitText { splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); final List splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); - splits.get(0).assertContentEquals("1\n2"); + MockFlowFile split0 = splits.get(0); + split0.assertContentEquals("1\n2"); + split0.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1"); + split0.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "1"); + split0.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "2"); + } + + @Test + public void testFragmentCountIsActualFlowFileCount() { + final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText()); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0"); + splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "1"); + splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + splitRunner.enqueue("1\n2\n\n\n\n\n\n\n\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + MockFlowFile split0 = splits.get(0); + split0.assertContentEquals("1"); + split0.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1"); + split0.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2"); + split0.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1"); + MockFlowFile split1 = splits.get(1); + split1.assertContentEquals("2"); + split1.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "2"); + split1.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2"); + split1.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1"); }