diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index 678919c89f..8c59d88288 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -63,7 +64,7 @@ import org.apache.nifi.util.ObjectHolder; @SupportsBatching @Tags({"split", "text"}) @InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines") +@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines.") @WritesAttributes({ @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"), @WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), @@ -82,7 +83,7 @@ public class SplitText extends AbstractProcessor { public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder() .name("Line Split Count") - .description("The number of lines that will be added to each split file") + .description("The number of lines that will be added to each split file (excluding the header, if the Header Line Count property is greater than 0).") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); @@ -95,7 +96,10 @@ public class SplitText extends AbstractProcessor { .build(); public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new PropertyDescriptor.Builder() .name("Remove Trailing Newlines") - .description("Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later") + .description("Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later. If this is set to " + + "'true' and a FlowFile is generated that contains only 'empty lines' (i.e., consists only of \r and \n characters), the FlowFile will not be emitted. " + + "Note, however, that if the Header Line Count is greater than 0, the resultant FlowFile will never be empty as it will consist of the header lines, so " + + "a FlowFile may be emitted that contians only the header lines.") .required(true) .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .allowableValues("true", "false") @@ -143,51 +147,93 @@ public class SplitText extends AbstractProcessor { return properties; } - private int readLines(final InputStream in, final int maxNumLines, final OutputStream out, final boolean keepAllNewLines) throws IOException { + private int readLines(final InputStream in, final int maxNumLines, final OutputStream out, final boolean keepAllNewLines, final byte[] leadingNewLineBytes) throws IOException { + final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); + int numLines = 0; + byte[] leadingBytes = leadingNewLineBytes; for (int i = 0; i < maxNumLines; i++) { - final long bytes = countBytesToSplitPoint(in, out, keepAllNewLines || (i != maxNumLines - 1)); - if (bytes <= 0) { - return numLines; + final EndOfLineMarker eolMarker = locateEndOfLine(in, out, false, eolBuffer, leadingBytes); + leadingBytes = eolMarker.getLeadingNewLineBytes(); + + if (keepAllNewLines && out != null) { + if (leadingBytes != null) { + out.write(leadingBytes); + leadingBytes = null; + } + + eolBuffer.drainTo(out); } - numLines++; + if (eolBuffer.length() > 0 || eolMarker.getBytesConsumed() > 0L) { + numLines++; + } + + if (eolMarker.isStreamEnded()) { + break; + } } return numLines; } - private long countBytesToSplitPoint(final InputStream in, final OutputStream out, final boolean includeLineDelimiter) throws IOException { + private EndOfLineMarker locateEndOfLine(final InputStream in, final OutputStream out, final boolean includeLineDelimiter, + final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { + int lastByte = -1; long bytesRead = 0L; + byte[] bytesToWriteFirst = leadingNewLineBytes; while (true) { in.mark(1); final int nextByte = in.read(); + final boolean isNewLineChar = nextByte == '\r' || nextByte == '\n'; + // if we hit end of stream or new line we're done if (nextByte == -1) { if (lastByte == '\r') { - return includeLineDelimiter ? bytesRead : bytesRead - 1; - } else { - return bytesRead; + eolBuffer.addEndOfLine(true, false); } + + return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); + } + + // If we get a character that's not an end-of-line char, then + // we need to write out the EOL's that we have buffered (if out != null). + // Then, we need to reset our EOL buffer because we no longer have consecutive EOL's + if (!isNewLineChar) { + if (bytesToWriteFirst != null) { + if (out != null) { + out.write(bytesToWriteFirst); + } + + bytesToWriteFirst = null; + } + + if (out != null) { + eolBuffer.drainTo(out); + } + + eolBuffer.clear(); } // if there's an OutputStream to copy the data to, copy it, if appropriate. // "if appropriate" means that it's not a line delimiter or that we want to copy line delimiters bytesRead++; - if (out != null && (includeLineDelimiter || (nextByte != '\n' && nextByte != '\r'))) { + if (out != null && (includeLineDelimiter || !isNewLineChar)) { + if (bytesToWriteFirst != null) { + out.write(bytesToWriteFirst); + bytesToWriteFirst = null; + } + out.write(nextByte); } // if we have a new line, then we're done if (nextByte == '\n') { - if (includeLineDelimiter) { - return bytesRead; - } else { - return (lastByte == '\r') ? bytesRead - 2 : bytesRead - 1; - } + eolBuffer.addEndOfLine(lastByte == '\r', true); + return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst); } // we didn't get a new line but if last byte was carriage return we've reached a new-line. @@ -195,7 +241,8 @@ public class SplitText extends AbstractProcessor { if (lastByte == '\r') { in.reset(); bytesRead--; // we reset the stream by 1 byte so decrement the number of bytes read by 1 - return includeLineDelimiter ? bytesRead : bytesRead - 1; + eolBuffer.addEndOfLine(true, false); + return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst); } // keep track of what the last byte was that we read so that we can detect \r followed by some other @@ -204,17 +251,31 @@ public class SplitText extends AbstractProcessor { } } - private SplitInfo countBytesToSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines) throws IOException { - SplitInfo info = new SplitInfo(); + private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines) throws IOException { + final SplitInfo info = new SplitInfo(); + final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); while (info.lengthLines < numLines) { - final long bytesTillNext = countBytesToSplitPoint(in, null, keepAllNewLines || (info.lengthLines != numLines - 1)); - if (bytesTillNext <= 0L) { - break; - } + final boolean keepNewLine = keepAllNewLines || (info.lengthLines != numLines - 1); + + final EndOfLineMarker eolMarker = locateEndOfLine(in, null, keepNewLine, eolBuffer, null); + long bytesTillNext = eolMarker.getBytesConsumed(); info.lengthLines++; + + // if this is the last line in the split and we don't want to keep all lines + // (i.e., we want to remove trailing new lines), then decrement out lengthBytes + final boolean isLastLine = eolMarker.isStreamEnded() || info.lengthLines >= numLines; + if (isLastLine && !keepAllNewLines) { + bytesTillNext -= eolBuffer.length(); + } + info.lengthBytes += bytesTillNext; + + if (eolMarker.isStreamEnded()) { + info.endOfStream = true; + break; + } } return info; @@ -245,12 +306,39 @@ public class SplitText extends AbstractProcessor { // if we have header lines, copy them into a ByteArrayOutputStream final ByteArrayOutputStream headerStream = new ByteArrayOutputStream(); - final int headerLinesCopied = readLines(in, headerCount, headerStream, true); + + final int headerLinesCopied = readLines(in, headerCount, headerStream, true, null); if (headerLinesCopied < headerCount) { errorMessage.set("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines"); return; } + // Break header apart into trailing newlines and + final byte[] headerBytes = headerStream.toByteArray(); + int headerNewLineByteCount = 0; + for (int i = headerBytes.length - 1; i >= 0; i--) { + final byte headerByte = headerBytes[i]; + + if (headerByte == '\r' || headerByte == '\n') { + headerNewLineByteCount++; + } else { + break; + } + } + + final byte[] headerNewLineBytes; + final byte[] headerBytesWithoutTrailingNewLines; + if (headerNewLineByteCount == 0) { + headerNewLineBytes = null; + headerBytesWithoutTrailingNewLines = headerBytes; + } else { + headerNewLineBytes = new byte[headerNewLineByteCount]; + System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount); + + headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount]; + System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount); + } + while (true) { if (headerCount > 0) { // if we have header lines, create a new FlowFile, copy the header lines to that file, @@ -262,8 +350,8 @@ public class SplitText extends AbstractProcessor { @Override public void process(final OutputStream rawOut) throws IOException { try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) { - headerStream.writeTo(out); - linesCopied.set(readLines(in, splitCount, out, !removeTrailingNewlines)); + out.write(headerBytesWithoutTrailingNewLines); + linesCopied.set(readLines(in, splitCount, out, !removeTrailingNewlines, headerNewLineBytes)); } } }); @@ -287,11 +375,8 @@ public class SplitText extends AbstractProcessor { // We have no header lines, so we can simply demarcate the original File via the // ProcessSession#clone method. long beforeReadingLines = in.getBytesConsumed(); - final SplitInfo info = countBytesToSplitPoint(in, splitCount, !removeTrailingNewlines); - if (info.lengthBytes == 0) { - // stream is out of data - break; - } else { + final SplitInfo info = locateSplitPoint(in, splitCount, !removeTrailingNewlines); + if (info.lengthBytes > 0) { info.offsetBytes = beforeReadingLines; splitInfos.add(info); final long procNanos = System.nanoTime() - startNanos; @@ -300,6 +385,10 @@ public class SplitText extends AbstractProcessor { + "total splits = {}; total processing time = {} ms", new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis}); } + + if (info.endOfStream) { + break; + } } } } @@ -354,16 +443,17 @@ public class SplitText extends AbstractProcessor { } private static class SplitInfo { - public long offsetBytes; public long lengthBytes; public long lengthLines; + public boolean endOfStream; public SplitInfo() { super(); this.offsetBytes = 0L; this.lengthBytes = 0L; this.lengthLines = 0L; + this.endOfStream = false; } @SuppressWarnings("unused") @@ -374,4 +464,80 @@ public class SplitText extends AbstractProcessor { this.lengthLines = lengthLines; } } + + public static class EndOfLineBuffer { + private static final byte CARRIAGE_RETURN = (byte) '\r'; + private static final byte NEWLINE = (byte) '\n'; + + private final BitSet buffer = new BitSet(); + private int index = 0; + + public void clear() { + index = 0; + } + + public void addEndOfLine(final boolean carriageReturn, final boolean newLine) { + buffer.set(index++, carriageReturn); + buffer.set(index++, newLine); + } + + private void drainTo(final OutputStream out) throws IOException { + for (int i = 0; i < index; i += 2) { + final boolean cr = buffer.get(i); + final boolean nl = buffer.get(i + 1); + + // we've consumed all data in the buffer + if (!cr && !nl) { + return; + } + + if (cr) { + out.write(CARRIAGE_RETURN); + } + + if (nl) { + out.write(NEWLINE); + } + } + + clear(); + } + + /** + * @return the number of line endings in the buffer + */ + public int length() { + return index / 2; + } + } + + public static class EndOfLineMarker { + private final long bytesConsumed; + private final EndOfLineBuffer eolBuffer; + private final boolean streamEnded; + private final byte[] leadingNewLineBytes; + + public EndOfLineMarker(final long bytesCounted, final EndOfLineBuffer eolBuffer, final boolean streamEnded, final byte[] leadingNewLineBytes) { + this.bytesConsumed = bytesCounted; + this.eolBuffer = eolBuffer; + this.streamEnded = streamEnded; + this.leadingNewLineBytes = leadingNewLineBytes; + } + + public long getBytesConsumed() { + return bytesConsumed; + } + + public EndOfLineBuffer getEndOfLineBuffer() { + return eolBuffer; + } + + public boolean isStreamEnded() { + return streamEnded; + } + + public byte[] getLeadingNewLineBytes() { + return leadingNewLineBytes; + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java index aa28cc01f0..f8b75e169a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java @@ -35,6 +35,8 @@ public class TestSplitText { final String originalFilename = "original.txt"; final Path dataPath = Paths.get("src/test/resources/TestSplitText"); final Path file = dataPath.resolve(originalFilename); + final static String TEST_INPUT_DATA = "HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4\n\n\nLine8\nLine9\n\n\n13\n14\n15 EndofLine15\n16\n" + + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nLastLine\n"; @Test public void testRoutesToFailureIfHeaderLinesNotAllPresent() throws IOException { @@ -84,6 +86,171 @@ public class TestSplitText { splits.get(3).assertContentEquals(expected3); } + @Test + public void testOneLineSplitWithoutHeader() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitText()); + runner.setProperty(SplitText.HEADER_LINE_COUNT, "0"); + runner.setProperty(SplitText.LINE_SPLIT_COUNT, "1"); + + runner.enqueue(TEST_INPUT_DATA); + runner.run(); + + runner.assertTransferCount(SplitText.REL_FAILURE, 0); + runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitText.REL_SPLITS, 11); + + final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + + splits.get(0).assertContentEquals("HeaderLine1"); + splits.get(1).assertContentEquals("Line2SpacesAtEnd "); + splits.get(2).assertContentEquals("Line3"); + splits.get(3).assertContentEquals("Line4"); + splits.get(4).assertContentEquals("Line8"); + splits.get(5).assertContentEquals("Line9"); + splits.get(6).assertContentEquals("13"); + splits.get(7).assertContentEquals("14"); + splits.get(8).assertContentEquals("15 EndofLine15"); + splits.get(9).assertContentEquals("16"); + splits.get(10).assertContentEquals("LastLine"); + } + + @Test + public void testFiveLineSplitWithoutHeader() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitText()); + runner.setProperty(SplitText.HEADER_LINE_COUNT, "0"); + runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5"); + + runner.enqueue(TEST_INPUT_DATA); + runner.run(); + + runner.assertTransferCount(SplitText.REL_FAILURE, 0); + runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitText.REL_SPLITS, 4); + + final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + + splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4"); + splits.get(1).assertContentEquals("\nLine8\nLine9"); + splits.get(2).assertContentEquals("13\n14\n15 EndofLine15\n16"); + splits.get(3).assertContentEquals("\n\nLastLine"); + } + + @Test + public void testFiveLineSplitWithoutHeaderRetainNewline() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitText()); + runner.setProperty(SplitText.HEADER_LINE_COUNT, "0"); + runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5"); + runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false"); + + runner.enqueue(TEST_INPUT_DATA); + runner.run(); + + runner.assertTransferCount(SplitText.REL_FAILURE, 0); + runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitText.REL_SPLITS, 10); + + final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + + splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4\n\n"); + splits.get(1).assertContentEquals("\nLine8\nLine9\n\n\n"); + splits.get(2).assertContentEquals("13\n14\n15 EndofLine15\n16\n\n"); + splits.get(3).assertContentEquals("\n\n\n\n\n"); + splits.get(4).assertContentEquals("\n\n\n\n\n"); + splits.get(5).assertContentEquals("\n\n\n\n\n"); + splits.get(6).assertContentEquals("\n\n\n\n\n"); + splits.get(7).assertContentEquals("\n\n\n\n\n"); + splits.get(8).assertContentEquals("\n\n\n\n\n"); + splits.get(9).assertContentEquals("\n\nLastLine\n"); + } + + @Test + public void testFiveLineSplitWithHeaderRetainNewline() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitText()); + runner.setProperty(SplitText.HEADER_LINE_COUNT, "1"); + runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5"); + runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false"); + runner.enqueue(TEST_INPUT_DATA); + runner.run(); + + runner.assertTransferCount(SplitText.REL_FAILURE, 0); + runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitText.REL_SPLITS, 10); + + final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + + splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4\n\n\n"); + splits.get(1).assertContentEquals("HeaderLine1\nLine8\nLine9\n\n\n13\n"); + splits.get(2).assertContentEquals("HeaderLine1\n14\n15 EndofLine15\n16\n\n\n"); + splits.get(3).assertContentEquals("HeaderLine1\n\n\n\n\n\n"); + splits.get(4).assertContentEquals("HeaderLine1\n\n\n\n\n\n"); + splits.get(5).assertContentEquals("HeaderLine1\n\n\n\n\n\n"); + splits.get(6).assertContentEquals("HeaderLine1\n\n\n\n\n\n"); + splits.get(7).assertContentEquals("HeaderLine1\n\n\n\n\n\n"); + splits.get(8).assertContentEquals("HeaderLine1\n\n\n\n\n\n"); + splits.get(9).assertContentEquals("HeaderLine1\n\nLastLine\n"); + } + + @Test + public void testFiveLineSplitWithHeaderNotRetainNewline() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitText()); + runner.setProperty(SplitText.HEADER_LINE_COUNT, "1"); + runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5"); + runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + runner.enqueue(TEST_INPUT_DATA); + runner.run(); + + runner.assertTransferCount(SplitText.REL_FAILURE, 0); + runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitText.REL_SPLITS, 10); + + final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4"); + splits.get(1).assertContentEquals("HeaderLine1\nLine8\nLine9\n\n\n13"); + splits.get(2).assertContentEquals("HeaderLine1\n14\n15 EndofLine15\n16"); + splits.get(3).assertContentEquals("HeaderLine1"); + splits.get(4).assertContentEquals("HeaderLine1"); + splits.get(5).assertContentEquals("HeaderLine1"); + splits.get(6).assertContentEquals("HeaderLine1"); + splits.get(7).assertContentEquals("HeaderLine1"); + splits.get(8).assertContentEquals("HeaderLine1"); + splits.get(9).assertContentEquals("HeaderLine1\n\nLastLine"); + } + + @Test + public void testOneLineSplitWithHeader() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitText()); + runner.setProperty(SplitText.HEADER_LINE_COUNT, "1"); + runner.setProperty(SplitText.LINE_SPLIT_COUNT, "1"); + + runner.enqueue(TEST_INPUT_DATA); + runner.run(); + + runner.assertTransferCount(SplitText.REL_FAILURE, 0); + runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitText.REL_SPLITS, 47); + + final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + + splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd "); + splits.get(1).assertContentEquals("HeaderLine1\nLine3"); + splits.get(2).assertContentEquals("HeaderLine1\nLine4"); + splits.get(3).assertContentEquals("HeaderLine1"); + splits.get(4).assertContentEquals("HeaderLine1"); + splits.get(5).assertContentEquals("HeaderLine1\nLine8"); + splits.get(6).assertContentEquals("HeaderLine1\nLine9"); + splits.get(7).assertContentEquals("HeaderLine1"); + splits.get(8).assertContentEquals("HeaderLine1"); + splits.get(9).assertContentEquals("HeaderLine1\n13"); + splits.get(10).assertContentEquals("HeaderLine1\n14"); + splits.get(11).assertContentEquals("HeaderLine1\n15 EndofLine15"); + splits.get(12).assertContentEquals("HeaderLine1\n16"); + for (int i = 13; i < 46; i++) { + splits.get(i).assertContentEquals("HeaderLine1"); + } + splits.get(46).assertContentEquals("HeaderLine1\nLastLine"); + } + @Test public void testSplitWithTwoLineHeader() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new SplitText()); @@ -160,4 +327,196 @@ public class TestSplitText { assertEquals(Files.size(dataPath.resolve(originalFilename)), flowFile.getSize()); flowFile.assertContentEquals(file); } + + + /* + * If an input FlowFile has a number of blank lines greater than the Line Split Count property, + * ensure that the remainder of the FlowFile will be processed, resulting in no data loss. + */ + @Test + public void testSplitWithOnlyNewLines() { + final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText()); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "2"); + splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3"); + splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + splitRunner.enqueue("H1\nH2\n1\n2\n3\n\n\n\n\n\n\n10\n11\n12\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 4); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splits.get(0).assertContentEquals("H1\nH2\n1\n2\n3"); + splits.get(1).assertContentEquals("H1\nH2"); + splits.get(2).assertContentEquals("H1\nH2"); + splits.get(3).assertContentEquals("H1\nH2\n10\n11\n12"); + + splitRunner.clearTransferState(); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0"); + splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3"); + splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + splitRunner.enqueue("1\n2\n3\n\n\n\n\n\n\n10\n11\n12\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splitsWithNoHeader = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splitsWithNoHeader.get(0).assertContentEquals("1\n2\n3"); + splitsWithNoHeader.get(1).assertContentEquals("10\n11\n12"); + + } + + /* + * If an input FlowFile has X blank lines at the end of a file and Line Split Count is + * greater than X, verify that newlines are removed. + */ + @Test + public void testWithLotsOfBlankLinesAtEnd() { + // verify with header lines + final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText()); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "2"); + splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "10"); + splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + splitRunner.enqueue("H1\nH2\n1\n\n\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 1); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splits.get(0).assertContentEquals("H1\nH2\n1"); + + // verify without headers + splitRunner.clearTransferState(); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0"); + + splitRunner.enqueue("1\n2\n\n\n\n"); + splitRunner.run(); + + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 1); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splitsWithNoHeader = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splitsWithNoHeader.get(0).assertContentEquals("1\n2"); + } + + /* + * If an input FlowFile has X blank lines at the end of a file and Header Line Count = 0, + * ensure all newlines removed from end of file. Previous behavior was: In the case where X is greater than + * Line Split Count, there will be split files consisting of nothing but blank lines, + * specifically one fewer lines than Line Split Count (i.e. only the final newline character is removed). + * + * Ensure that the above behavior is no longer reflected by the Processor. + */ + @Test + public void testAllNewLinesTrimmed() { + final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText()); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0"); + splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3"); + splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + splitRunner.enqueue("1\n2\n\n\n\n\n\n\n\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 1); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splits.get(0).assertContentEquals("1\n2"); + } + + + /* + * Previous behavior that was exhibited: + * If an input FlowFile has X blank lines at the end of a file and + * Header Line Count = 1 (or any non-zero value), the blank lines + * are removed and no split file of just blanks is created. However, + * the final line does contain a newline character. In other split + * files, the final line has the newline character removed. + * + * Ensure that this behavior has been addressed. The Split file that + * does contain content should not have the trailing new line. The + * last FlowFile should be generated, containing nothing. + */ + @Test + public void testConsistentTrailingOfNewLines() { + final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText()); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "1"); + splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3"); + splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + splitRunner.enqueue("H1\n1\n\n\n\n\n\n\n\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 3); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splits.get(0).assertContentEquals("H1\n1"); + splits.get(1).assertContentEquals("H1"); + splits.get(2).assertContentEquals("H1"); + } + + @Test + public void testWithSplitThatStartsWithNewLine() { + final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText()); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "1"); + splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3"); + splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + splitRunner.enqueue("H1\n1\n2\n3\n\n\n4\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splits.get(0).assertContentEquals("H1\n1\n2\n3"); + splits.get(1).assertContentEquals("H1\n\n\n4"); + + splitRunner.clearTransferState(); + + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0"); + splitRunner.enqueue("1\n2\n3\n\n\n4\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splitsWithoutHeader = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splitsWithoutHeader.get(0).assertContentEquals("1\n2\n3"); + splitsWithoutHeader.get(1).assertContentEquals("\n\n4"); + } + + @Test + public void testWithEmptyHeaderLines() { + final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText()); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "2"); + splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3"); + splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + splitRunner.enqueue("\n\n1\n\n\n\n\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + splits.get(0).assertContentEquals("\n\n1"); + splits.get(1).assertContentEquals(""); + } + }