NIFI-1649: Addressed bug in SplitText that resulted in improper handling of input data if Remove Trailing Newlines is true and there is a FlowFile that is only made up of newlines

This commit is contained in:
Mark Payne 2016-03-21 13:26:32 -04:00
parent 38c782c30b
commit 736896246c
2 changed files with 559 additions and 34 deletions

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -63,7 +64,7 @@ import org.apache.nifi.util.ObjectHolder;
@SupportsBatching
@Tags({"split", "text"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines")
@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines.")
@WritesAttributes({
@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"),
@WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@ -82,7 +83,7 @@ public class SplitText extends AbstractProcessor {
public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder()
.name("Line Split Count")
.description("The number of lines that will be added to each split file")
.description("The number of lines that will be added to each split file (excluding the header, if the Header Line Count property is greater than 0).")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
@ -95,7 +96,10 @@ public class SplitText extends AbstractProcessor {
.build();
public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new PropertyDescriptor.Builder()
.name("Remove Trailing Newlines")
.description("Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later")
.description("Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later. If this is set to "
+ "'true' and a FlowFile is generated that contains only 'empty lines' (i.e., consists only of \r and \n characters), the FlowFile will not be emitted. "
+ "Note, however, that if the Header Line Count is greater than 0, the resultant FlowFile will never be empty as it will consist of the header lines, so "
+ "a FlowFile may be emitted that contians only the header lines.")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
@ -143,51 +147,93 @@ public class SplitText extends AbstractProcessor {
return properties;
}
private int readLines(final InputStream in, final int maxNumLines, final OutputStream out, final boolean keepAllNewLines) throws IOException {
private int readLines(final InputStream in, final int maxNumLines, final OutputStream out, final boolean keepAllNewLines, final byte[] leadingNewLineBytes) throws IOException {
final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
int numLines = 0;
byte[] leadingBytes = leadingNewLineBytes;
for (int i = 0; i < maxNumLines; i++) {
final long bytes = countBytesToSplitPoint(in, out, keepAllNewLines || (i != maxNumLines - 1));
if (bytes <= 0) {
return numLines;
final EndOfLineMarker eolMarker = locateEndOfLine(in, out, false, eolBuffer, leadingBytes);
leadingBytes = eolMarker.getLeadingNewLineBytes();
if (keepAllNewLines && out != null) {
if (leadingBytes != null) {
out.write(leadingBytes);
leadingBytes = null;
}
eolBuffer.drainTo(out);
}
numLines++;
if (eolBuffer.length() > 0 || eolMarker.getBytesConsumed() > 0L) {
numLines++;
}
if (eolMarker.isStreamEnded()) {
break;
}
}
return numLines;
}
private long countBytesToSplitPoint(final InputStream in, final OutputStream out, final boolean includeLineDelimiter) throws IOException {
private EndOfLineMarker locateEndOfLine(final InputStream in, final OutputStream out, final boolean includeLineDelimiter,
final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
int lastByte = -1;
long bytesRead = 0L;
byte[] bytesToWriteFirst = leadingNewLineBytes;
while (true) {
in.mark(1);
final int nextByte = in.read();
final boolean isNewLineChar = nextByte == '\r' || nextByte == '\n';
// if we hit end of stream or new line we're done
if (nextByte == -1) {
if (lastByte == '\r') {
return includeLineDelimiter ? bytesRead : bytesRead - 1;
} else {
return bytesRead;
eolBuffer.addEndOfLine(true, false);
}
return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);
}
// If we get a character that's not an end-of-line char, then
// we need to write out the EOL's that we have buffered (if out != null).
// Then, we need to reset our EOL buffer because we no longer have consecutive EOL's
if (!isNewLineChar) {
if (bytesToWriteFirst != null) {
if (out != null) {
out.write(bytesToWriteFirst);
}
bytesToWriteFirst = null;
}
if (out != null) {
eolBuffer.drainTo(out);
}
eolBuffer.clear();
}
// if there's an OutputStream to copy the data to, copy it, if appropriate.
// "if appropriate" means that it's not a line delimiter or that we want to copy line delimiters
bytesRead++;
if (out != null && (includeLineDelimiter || (nextByte != '\n' && nextByte != '\r'))) {
if (out != null && (includeLineDelimiter || !isNewLineChar)) {
if (bytesToWriteFirst != null) {
out.write(bytesToWriteFirst);
bytesToWriteFirst = null;
}
out.write(nextByte);
}
// if we have a new line, then we're done
if (nextByte == '\n') {
if (includeLineDelimiter) {
return bytesRead;
} else {
return (lastByte == '\r') ? bytesRead - 2 : bytesRead - 1;
}
eolBuffer.addEndOfLine(lastByte == '\r', true);
return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
}
// we didn't get a new line but if last byte was carriage return we've reached a new-line.
@ -195,7 +241,8 @@ public class SplitText extends AbstractProcessor {
if (lastByte == '\r') {
in.reset();
bytesRead--; // we reset the stream by 1 byte so decrement the number of bytes read by 1
return includeLineDelimiter ? bytesRead : bytesRead - 1;
eolBuffer.addEndOfLine(true, false);
return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
}
// keep track of what the last byte was that we read so that we can detect \r followed by some other
@ -204,17 +251,31 @@ public class SplitText extends AbstractProcessor {
}
}
private SplitInfo countBytesToSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines) throws IOException {
SplitInfo info = new SplitInfo();
private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines) throws IOException {
final SplitInfo info = new SplitInfo();
final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
while (info.lengthLines < numLines) {
final long bytesTillNext = countBytesToSplitPoint(in, null, keepAllNewLines || (info.lengthLines != numLines - 1));
if (bytesTillNext <= 0L) {
break;
}
final boolean keepNewLine = keepAllNewLines || (info.lengthLines != numLines - 1);
final EndOfLineMarker eolMarker = locateEndOfLine(in, null, keepNewLine, eolBuffer, null);
long bytesTillNext = eolMarker.getBytesConsumed();
info.lengthLines++;
// if this is the last line in the split and we don't want to keep all lines
// (i.e., we want to remove trailing new lines), then decrement out lengthBytes
final boolean isLastLine = eolMarker.isStreamEnded() || info.lengthLines >= numLines;
if (isLastLine && !keepAllNewLines) {
bytesTillNext -= eolBuffer.length();
}
info.lengthBytes += bytesTillNext;
if (eolMarker.isStreamEnded()) {
info.endOfStream = true;
break;
}
}
return info;
@ -245,12 +306,39 @@ public class SplitText extends AbstractProcessor {
// if we have header lines, copy them into a ByteArrayOutputStream
final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
final int headerLinesCopied = readLines(in, headerCount, headerStream, true);
final int headerLinesCopied = readLines(in, headerCount, headerStream, true, null);
if (headerLinesCopied < headerCount) {
errorMessage.set("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines");
return;
}
// Break header apart into trailing newlines and
final byte[] headerBytes = headerStream.toByteArray();
int headerNewLineByteCount = 0;
for (int i = headerBytes.length - 1; i >= 0; i--) {
final byte headerByte = headerBytes[i];
if (headerByte == '\r' || headerByte == '\n') {
headerNewLineByteCount++;
} else {
break;
}
}
final byte[] headerNewLineBytes;
final byte[] headerBytesWithoutTrailingNewLines;
if (headerNewLineByteCount == 0) {
headerNewLineBytes = null;
headerBytesWithoutTrailingNewLines = headerBytes;
} else {
headerNewLineBytes = new byte[headerNewLineByteCount];
System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
}
while (true) {
if (headerCount > 0) {
// if we have header lines, create a new FlowFile, copy the header lines to that file,
@ -262,8 +350,8 @@ public class SplitText extends AbstractProcessor {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
headerStream.writeTo(out);
linesCopied.set(readLines(in, splitCount, out, !removeTrailingNewlines));
out.write(headerBytesWithoutTrailingNewLines);
linesCopied.set(readLines(in, splitCount, out, !removeTrailingNewlines, headerNewLineBytes));
}
}
});
@ -287,11 +375,8 @@ public class SplitText extends AbstractProcessor {
// We have no header lines, so we can simply demarcate the original File via the
// ProcessSession#clone method.
long beforeReadingLines = in.getBytesConsumed();
final SplitInfo info = countBytesToSplitPoint(in, splitCount, !removeTrailingNewlines);
if (info.lengthBytes == 0) {
// stream is out of data
break;
} else {
final SplitInfo info = locateSplitPoint(in, splitCount, !removeTrailingNewlines);
if (info.lengthBytes > 0) {
info.offsetBytes = beforeReadingLines;
splitInfos.add(info);
final long procNanos = System.nanoTime() - startNanos;
@ -300,6 +385,10 @@ public class SplitText extends AbstractProcessor {
+ "total splits = {}; total processing time = {} ms",
new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
}
if (info.endOfStream) {
break;
}
}
}
}
@ -354,16 +443,17 @@ public class SplitText extends AbstractProcessor {
}
private static class SplitInfo {
public long offsetBytes;
public long lengthBytes;
public long lengthLines;
public boolean endOfStream;
public SplitInfo() {
super();
this.offsetBytes = 0L;
this.lengthBytes = 0L;
this.lengthLines = 0L;
this.endOfStream = false;
}
@SuppressWarnings("unused")
@ -374,4 +464,80 @@ public class SplitText extends AbstractProcessor {
this.lengthLines = lengthLines;
}
}
public static class EndOfLineBuffer {
private static final byte CARRIAGE_RETURN = (byte) '\r';
private static final byte NEWLINE = (byte) '\n';
private final BitSet buffer = new BitSet();
private int index = 0;
public void clear() {
index = 0;
}
public void addEndOfLine(final boolean carriageReturn, final boolean newLine) {
buffer.set(index++, carriageReturn);
buffer.set(index++, newLine);
}
private void drainTo(final OutputStream out) throws IOException {
for (int i = 0; i < index; i += 2) {
final boolean cr = buffer.get(i);
final boolean nl = buffer.get(i + 1);
// we've consumed all data in the buffer
if (!cr && !nl) {
return;
}
if (cr) {
out.write(CARRIAGE_RETURN);
}
if (nl) {
out.write(NEWLINE);
}
}
clear();
}
/**
* @return the number of line endings in the buffer
*/
public int length() {
return index / 2;
}
}
public static class EndOfLineMarker {
private final long bytesConsumed;
private final EndOfLineBuffer eolBuffer;
private final boolean streamEnded;
private final byte[] leadingNewLineBytes;
public EndOfLineMarker(final long bytesCounted, final EndOfLineBuffer eolBuffer, final boolean streamEnded, final byte[] leadingNewLineBytes) {
this.bytesConsumed = bytesCounted;
this.eolBuffer = eolBuffer;
this.streamEnded = streamEnded;
this.leadingNewLineBytes = leadingNewLineBytes;
}
public long getBytesConsumed() {
return bytesConsumed;
}
public EndOfLineBuffer getEndOfLineBuffer() {
return eolBuffer;
}
public boolean isStreamEnded() {
return streamEnded;
}
public byte[] getLeadingNewLineBytes() {
return leadingNewLineBytes;
}
}
}

View File

@ -35,6 +35,8 @@ public class TestSplitText {
final String originalFilename = "original.txt";
final Path dataPath = Paths.get("src/test/resources/TestSplitText");
final Path file = dataPath.resolve(originalFilename);
final static String TEST_INPUT_DATA = "HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4\n\n\nLine8\nLine9\n\n\n13\n14\n15 EndofLine15\n16\n"
+ "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nLastLine\n";
@Test
public void testRoutesToFailureIfHeaderLinesNotAllPresent() throws IOException {
@ -84,6 +86,171 @@ public class TestSplitText {
splits.get(3).assertContentEquals(expected3);
}
@Test
public void testOneLineSplitWithoutHeader() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "1");
runner.enqueue(TEST_INPUT_DATA);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 11);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("HeaderLine1");
splits.get(1).assertContentEquals("Line2SpacesAtEnd ");
splits.get(2).assertContentEquals("Line3");
splits.get(3).assertContentEquals("Line4");
splits.get(4).assertContentEquals("Line8");
splits.get(5).assertContentEquals("Line9");
splits.get(6).assertContentEquals("13");
splits.get(7).assertContentEquals("14");
splits.get(8).assertContentEquals("15 EndofLine15");
splits.get(9).assertContentEquals("16");
splits.get(10).assertContentEquals("LastLine");
}
@Test
public void testFiveLineSplitWithoutHeader() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5");
runner.enqueue(TEST_INPUT_DATA);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 4);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4");
splits.get(1).assertContentEquals("\nLine8\nLine9");
splits.get(2).assertContentEquals("13\n14\n15 EndofLine15\n16");
splits.get(3).assertContentEquals("\n\nLastLine");
}
@Test
public void testFiveLineSplitWithoutHeaderRetainNewline() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5");
runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
runner.enqueue(TEST_INPUT_DATA);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 10);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4\n\n");
splits.get(1).assertContentEquals("\nLine8\nLine9\n\n\n");
splits.get(2).assertContentEquals("13\n14\n15 EndofLine15\n16\n\n");
splits.get(3).assertContentEquals("\n\n\n\n\n");
splits.get(4).assertContentEquals("\n\n\n\n\n");
splits.get(5).assertContentEquals("\n\n\n\n\n");
splits.get(6).assertContentEquals("\n\n\n\n\n");
splits.get(7).assertContentEquals("\n\n\n\n\n");
splits.get(8).assertContentEquals("\n\n\n\n\n");
splits.get(9).assertContentEquals("\n\nLastLine\n");
}
@Test
public void testFiveLineSplitWithHeaderRetainNewline() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "1");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5");
runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
runner.enqueue(TEST_INPUT_DATA);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 10);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4\n\n\n");
splits.get(1).assertContentEquals("HeaderLine1\nLine8\nLine9\n\n\n13\n");
splits.get(2).assertContentEquals("HeaderLine1\n14\n15 EndofLine15\n16\n\n\n");
splits.get(3).assertContentEquals("HeaderLine1\n\n\n\n\n\n");
splits.get(4).assertContentEquals("HeaderLine1\n\n\n\n\n\n");
splits.get(5).assertContentEquals("HeaderLine1\n\n\n\n\n\n");
splits.get(6).assertContentEquals("HeaderLine1\n\n\n\n\n\n");
splits.get(7).assertContentEquals("HeaderLine1\n\n\n\n\n\n");
splits.get(8).assertContentEquals("HeaderLine1\n\n\n\n\n\n");
splits.get(9).assertContentEquals("HeaderLine1\n\nLastLine\n");
}
@Test
public void testFiveLineSplitWithHeaderNotRetainNewline() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "1");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5");
runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
runner.enqueue(TEST_INPUT_DATA);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 10);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4");
splits.get(1).assertContentEquals("HeaderLine1\nLine8\nLine9\n\n\n13");
splits.get(2).assertContentEquals("HeaderLine1\n14\n15 EndofLine15\n16");
splits.get(3).assertContentEquals("HeaderLine1");
splits.get(4).assertContentEquals("HeaderLine1");
splits.get(5).assertContentEquals("HeaderLine1");
splits.get(6).assertContentEquals("HeaderLine1");
splits.get(7).assertContentEquals("HeaderLine1");
splits.get(8).assertContentEquals("HeaderLine1");
splits.get(9).assertContentEquals("HeaderLine1\n\nLastLine");
}
@Test
public void testOneLineSplitWithHeader() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "1");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "1");
runner.enqueue(TEST_INPUT_DATA);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 47);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("HeaderLine1\nLine2SpacesAtEnd ");
splits.get(1).assertContentEquals("HeaderLine1\nLine3");
splits.get(2).assertContentEquals("HeaderLine1\nLine4");
splits.get(3).assertContentEquals("HeaderLine1");
splits.get(4).assertContentEquals("HeaderLine1");
splits.get(5).assertContentEquals("HeaderLine1\nLine8");
splits.get(6).assertContentEquals("HeaderLine1\nLine9");
splits.get(7).assertContentEquals("HeaderLine1");
splits.get(8).assertContentEquals("HeaderLine1");
splits.get(9).assertContentEquals("HeaderLine1\n13");
splits.get(10).assertContentEquals("HeaderLine1\n14");
splits.get(11).assertContentEquals("HeaderLine1\n15 EndofLine15");
splits.get(12).assertContentEquals("HeaderLine1\n16");
for (int i = 13; i < 46; i++) {
splits.get(i).assertContentEquals("HeaderLine1");
}
splits.get(46).assertContentEquals("HeaderLine1\nLastLine");
}
@Test
public void testSplitWithTwoLineHeader() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
@ -160,4 +327,196 @@ public class TestSplitText {
assertEquals(Files.size(dataPath.resolve(originalFilename)), flowFile.getSize());
flowFile.assertContentEquals(file);
}
/*
* If an input FlowFile has a number of blank lines greater than the Line Split Count property,
* ensure that the remainder of the FlowFile will be processed, resulting in no data loss.
*/
@Test
public void testSplitWithOnlyNewLines() {
final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText());
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
splitRunner.enqueue("H1\nH2\n1\n2\n3\n\n\n\n\n\n\n10\n11\n12\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 4);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("H1\nH2\n1\n2\n3");
splits.get(1).assertContentEquals("H1\nH2");
splits.get(2).assertContentEquals("H1\nH2");
splits.get(3).assertContentEquals("H1\nH2\n10\n11\n12");
splitRunner.clearTransferState();
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
splitRunner.enqueue("1\n2\n3\n\n\n\n\n\n\n10\n11\n12\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splitsWithNoHeader = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splitsWithNoHeader.get(0).assertContentEquals("1\n2\n3");
splitsWithNoHeader.get(1).assertContentEquals("10\n11\n12");
}
/*
* If an input FlowFile has X blank lines at the end of a file and Line Split Count is
* greater than X, verify that newlines are removed.
*/
@Test
public void testWithLotsOfBlankLinesAtEnd() {
// verify with header lines
final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText());
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "10");
splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
splitRunner.enqueue("H1\nH2\n1\n\n\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 1);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("H1\nH2\n1");
// verify without headers
splitRunner.clearTransferState();
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
splitRunner.enqueue("1\n2\n\n\n\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 1);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splitsWithNoHeader = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splitsWithNoHeader.get(0).assertContentEquals("1\n2");
}
/*
* If an input FlowFile has X blank lines at the end of a file and Header Line Count = 0,
* ensure all newlines removed from end of file. Previous behavior was: In the case where X is greater than
* Line Split Count, there will be split files consisting of nothing but blank lines,
* specifically one fewer lines than Line Split Count (i.e. only the final newline character is removed).
*
* Ensure that the above behavior is no longer reflected by the Processor.
*/
@Test
public void testAllNewLinesTrimmed() {
final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText());
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
splitRunner.enqueue("1\n2\n\n\n\n\n\n\n\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 1);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("1\n2");
}
/*
* Previous behavior that was exhibited:
* If an input FlowFile has X blank lines at the end of a file and
* Header Line Count = 1 (or any non-zero value), the blank lines
* are removed and no split file of just blanks is created. However,
* the final line does contain a newline character. In other split
* files, the final line has the newline character removed.
*
* Ensure that this behavior has been addressed. The Split file that
* does contain content should not have the trailing new line. The
* last FlowFile should be generated, containing nothing.
*/
@Test
public void testConsistentTrailingOfNewLines() {
final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText());
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "1");
splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
splitRunner.enqueue("H1\n1\n\n\n\n\n\n\n\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 3);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("H1\n1");
splits.get(1).assertContentEquals("H1");
splits.get(2).assertContentEquals("H1");
}
@Test
public void testWithSplitThatStartsWithNewLine() {
final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText());
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "1");
splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
splitRunner.enqueue("H1\n1\n2\n3\n\n\n4\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("H1\n1\n2\n3");
splits.get(1).assertContentEquals("H1\n\n\n4");
splitRunner.clearTransferState();
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
splitRunner.enqueue("1\n2\n3\n\n\n4\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splitsWithoutHeader = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splitsWithoutHeader.get(0).assertContentEquals("1\n2\n3");
splitsWithoutHeader.get(1).assertContentEquals("\n\n4");
}
@Test
public void testWithEmptyHeaderLines() {
final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText());
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
splitRunner.enqueue("\n\n1\n\n\n\n\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("\n\n1");
splits.get(1).assertContentEquals("");
}
}